* guangrong.xiao@xxxxxxxxx (guangrong.xiao@xxxxxxxxx) wrote: > From: Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx> > > This modules implements the lockless and efficient threaded workqueue. > > Three abstracted objects are used in this module: > - Request. > It not only contains the data that the workqueue fetches out > to finish the request but also offers the space to save the result > after the workqueue handles the request. > > It's flowed between user and workqueue. The user fills the request > data into it when it is owned by user. After it is submitted to the > workqueue, the workqueue fetched data out and save the result into > it after the request is handled. > > All the requests are pre-allocated and carefully partitioned between > threads so there is no contention on the request, that make threads > be parallel as much as possible. > > - User, i.e, the submitter > It's the one fills the request and submits it to the workqueue, > the result will be collected after it is handled by the work queue. > > The user can consecutively submit requests without waiting the previous > requests been handled. > It only supports one submitter, you should do serial submission by > yourself if you want more, e.g, use lock on you side. > > - Workqueue, i.e, thread > Each workqueue is represented by a running thread that fetches > the request submitted by the user, do the specified work and save > the result to the request. > > Signed-off-by: Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx> > --- > include/qemu/threaded-workqueue.h | 106 +++++++++ > util/Makefile.objs | 1 + > util/threaded-workqueue.c | 463 ++++++++++++++++++++++++++++++++++++++ > 3 files changed, 570 insertions(+) > create mode 100644 include/qemu/threaded-workqueue.h > create mode 100644 util/threaded-workqueue.c > > diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h > new file mode 100644 > index 0000000000..e0ede496d0 > --- /dev/null > +++ b/include/qemu/threaded-workqueue.h > @@ -0,0 +1,106 @@ > +/* > + * Lockless and Efficient Threaded Workqueue Abstraction > + * > + * Author: > + * Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx> > + * > + * Copyright(C) 2018 Tencent Corporation. > + * > + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. > + * See the COPYING.LIB file in the top-level directory. > + */ > + > +#ifndef QEMU_THREADED_WORKQUEUE_H > +#define QEMU_THREADED_WORKQUEUE_H > + > +#include "qemu/queue.h" > +#include "qemu/thread.h" > + > +/* > + * This modules implements the lockless and efficient threaded workqueue. > + * > + * Three abstracted objects are used in this module: > + * - Request. > + * It not only contains the data that the workqueue fetches out > + * to finish the request but also offers the space to save the result > + * after the workqueue handles the request. > + * > + * It's flowed between user and workqueue. The user fills the request > + * data into it when it is owned by user. After it is submitted to the > + * workqueue, the workqueue fetched data out and save the result into > + * it after the request is handled. > + * > + * All the requests are pre-allocated and carefully partitioned between > + * threads so there is no contention on the request, that make threads > + * be parallel as much as possible. > + * > + * - User, i.e, the submitter > + * It's the one fills the request and submits it to the workqueue, > + * the result will be collected after it is handled by the work queue. > + * > + * The user can consecutively submit requests without waiting the previous > + * requests been handled. > + * It only supports one submitter, you should do serial submission by > + * yourself if you want more, e.g, use lock on you side. > + * > + * - Workqueue, i.e, thread > + * Each workqueue is represented by a running thread that fetches > + * the request submitted by the user, do the specified work and save > + * the result to the request. > + */ > + > +typedef struct Threads Threads; > + > +struct ThreadedWorkqueueOps { > + /* constructor of the request */ > + int (*thread_request_init)(void *request); > + /* destructor of the request */ > + void (*thread_request_uninit)(void *request); > + > + /* the handler of the request that is called by the thread */ > + void (*thread_request_handler)(void *request); > + /* called by the user after the request has been handled */ > + void (*thread_request_done)(void *request); > + > + size_t request_size; > +}; > +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; > + > +/* the default number of requests that thread need handle */ > +#define DEFAULT_THREAD_REQUEST_NR 4 > +/* the max number of requests that thread need handle */ > +#define MAX_THREAD_REQUEST_NR (sizeof(uint64_t) * BITS_PER_BYTE) > + > +/* > + * create a threaded queue. Other APIs will work on the Threads it returned > + * > + * @name: the identity of the workqueue which is used to construct the name > + * of threads only > + * @threads_nr: the number of threads that the workqueue will create > + * @thread_requests_nr: the number of requests that each single thread will > + * handle > + * @ops: the handlers of the request > + * > + * Return NULL if it failed > + */ > +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, > + unsigned int thread_requests_nr, > + const ThreadedWorkqueueOps *ops); > +void threaded_workqueue_destroy(Threads *threads); > + > +/* > + * find a free request where the user can store the data that is needed to > + * finish the request > + * > + * If all requests are used up, return NULL > + */ > +void *threaded_workqueue_get_request(Threads *threads); > +/* submit the request and notify the thread */ > +void threaded_workqueue_submit_request(Threads *threads, void *request); > + > +/* > + * wait all threads to complete the request to make sure there is no > + * previous request exists > + */ > +void threaded_workqueue_wait_for_requests(Threads *threads); > +#endif > diff --git a/util/Makefile.objs b/util/Makefile.objs > index 0820923c18..f26dfe5182 100644 > --- a/util/Makefile.objs > +++ b/util/Makefile.objs > @@ -50,5 +50,6 @@ util-obj-y += range.o > util-obj-y += stats64.o > util-obj-y += systemd.o > util-obj-y += iova-tree.o > +util-obj-y += threaded-workqueue.o > util-obj-$(CONFIG_LINUX) += vfio-helpers.o > util-obj-$(CONFIG_OPENGL) += drm.o > diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c > new file mode 100644 > index 0000000000..2ab37cee8d > --- /dev/null > +++ b/util/threaded-workqueue.c > @@ -0,0 +1,463 @@ > +/* > + * Lockless and Efficient Threaded Workqueue Abstraction > + * > + * Author: > + * Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx> > + * > + * Copyright(C) 2018 Tencent Corporation. > + * > + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. > + * See the COPYING.LIB file in the top-level directory. > + */ > + > +#include "qemu/osdep.h" > +#include "qemu/bitmap.h" > +#include "qemu/threaded-workqueue.h" > + > +#define SMP_CACHE_BYTES 64 That's architecture dependent isn't it? > + > +/* > + * the request representation which contains the internally used mete data, > + * it is the header of user-defined data. > + * > + * It should be aligned to the nature size of CPU. > + */ > +struct ThreadRequest { > + /* > + * the request has been handled by the thread and need the user > + * to fetch result out. > + */ > + uint8_t done; > + > + /* > + * the index to Thread::requests. > + * Save it to the padding space although it can be calculated at runtime. > + */ > + uint8_t request_index; > + > + /* the index to Threads::per_thread_data */ > + unsigned int thread_index; > +} QEMU_ALIGNED(sizeof(unsigned long)); > +typedef struct ThreadRequest ThreadRequest; > + > +struct ThreadLocal { > + struct Threads *threads; > + > + /* the index of the thread */ > + int self; > + > + /* thread is useless and needs to exit */ > + bool quit; > + > + QemuThread thread; > + > + void *requests; > + > + /* > + * the bit in these two bitmaps indicates the index of the @requests > + * respectively. If it's the same, the corresponding request is free > + * and owned by the user, i.e, where the user fills a request. Otherwise, > + * it is valid and owned by the thread, i.e, where the thread fetches > + * the request and write the result. > + */ > + > + /* after the user fills the request, the bit is flipped. */ > + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); > + /* after handles the request, the thread flips the bit. */ > + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); Patchew complained about some type mismatches; I think those are because you're using the bitmap_* functions on these; those functions always operate on 'long' not on uint64_t - and on some platforms they're unfortunately not the same. Dave > + /* > + * the event used to wake up the thread whenever a valid request has > + * been submitted > + */ > + QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); > + > + /* > + * the event is notified whenever a request has been completed > + * (i.e, become free), which is used to wake up the user > + */ > + QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); > +}; > +typedef struct ThreadLocal ThreadLocal; > + > +/* > + * the main data struct represents multithreads which is shared by > + * all threads > + */ > +struct Threads { > + /* the request header, ThreadRequest, is contained */ > + unsigned int request_size; > + unsigned int thread_requests_nr; > + unsigned int threads_nr; > + > + /* the request is pushed to the thread with round-robin manner */ > + unsigned int current_thread_index; > + > + const ThreadedWorkqueueOps *ops; > + > + ThreadLocal per_thread_data[0]; > +}; > +typedef struct Threads Threads; > + > +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index) > +{ > + ThreadRequest *request; > + > + request = thread->requests + request_index * thread->threads->request_size; > + assert(request->request_index == request_index); > + assert(request->thread_index == thread->self); > + return request; > +} > + > +static int request_to_index(ThreadRequest *request) > +{ > + return request->request_index; > +} > + > +static int request_to_thread_index(ThreadRequest *request) > +{ > + return request->thread_index; > +} > + > +/* > + * free request: the request is not used by any thread, however, it might > + * contain the result need the user to call thread_request_done() > + * > + * valid request: the request contains the request data and it's committed > + * to the thread, i,e. it's owned by thread. > + */ > +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) > +{ > + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; > + > + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); > + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); > + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, > + threads->thread_requests_nr); > + > + /* > + * paired with smp_wmb() in mark_request_free() to make sure that we > + * read request_done_bitmap before fetching the result out. > + */ > + smp_rmb(); > + > + return result_bitmap; > +} > + > +static ThreadRequest > +*find_thread_free_request(Threads *threads, ThreadLocal *thread) > +{ > + uint64_t result_bitmap = get_free_request_bitmap(threads, thread); > + int index; > + > + index = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr); > + if (index >= threads->thread_requests_nr) { > + return NULL; > + } > + > + return index_to_request(thread, index); > +} > + > +static ThreadRequest *threads_find_free_request(Threads *threads) > +{ > + ThreadLocal *thread; > + ThreadRequest *request; > + int cur_thread, thread_index; > + > + cur_thread = threads->current_thread_index % threads->threads_nr; > + thread_index = cur_thread; > + do { > + thread = threads->per_thread_data + thread_index++; > + request = find_thread_free_request(threads, thread); > + if (request) { > + break; > + } > + thread_index %= threads->threads_nr; > + } while (thread_index != cur_thread); > + > + return request; > +} > + > +/* > + * the change bit operation combined with READ_ONCE and WRITE_ONCE which > + * only works on single uint64_t width > + */ > +static void change_bit_once(long nr, uint64_t *addr) > +{ > + uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr); > + > + atomic_rcu_set(addr, value); > +} > + > +static void mark_request_valid(Threads *threads, ThreadRequest *request) > +{ > + int thread_index = request_to_thread_index(request); > + int request_index = request_to_index(request); > + ThreadLocal *thread = threads->per_thread_data + thread_index; > + > + /* > + * paired with smp_rmb() in find_first_valid_request_index() to make > + * sure the request has been filled before the bit is flipped that > + * will make the request be visible to the thread > + */ > + smp_wmb(); > + > + change_bit_once(request_index, &thread->request_fill_bitmap); > + qemu_event_set(&thread->request_valid_ev); > +} > + > +static int thread_find_first_valid_request_index(ThreadLocal *thread) > +{ > + Threads *threads = thread->threads; > + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; > + int index; > + > + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); > + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); > + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, > + threads->thread_requests_nr); > + /* > + * paired with smp_wmb() in mark_request_valid() to make sure that > + * we read request_fill_bitmap before fetch the request out. > + */ > + smp_rmb(); > + > + index = find_first_bit(&result_bitmap, threads->thread_requests_nr); > + return index >= threads->thread_requests_nr ? -1 : index; > +} > + > +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) > +{ > + int index = request_to_index(request); > + > + /* > + * smp_wmb() is implied in change_bit_atomic() that is paired with > + * smp_rmb() in get_free_request_bitmap() to make sure the result > + * has been saved before the bit is flipped. > + */ > + change_bit_atomic(index, &thread->request_done_bitmap); > + qemu_event_set(&thread->request_free_ev); > +} > + > +/* retry to see if there is available request before actually go to wait. */ > +#define BUSY_WAIT_COUNT 1000 > + > +static ThreadRequest * > +thread_busy_wait_for_request(ThreadLocal *thread) > +{ > + int index, count = 0; > + > + for (count = 0; count < BUSY_WAIT_COUNT; count++) { > + index = thread_find_first_valid_request_index(thread); > + if (index >= 0) { > + return index_to_request(thread, index); > + } > + > + cpu_relax(); > + } > + > + return NULL; > +} > + > +static void *thread_run(void *opaque) > +{ > + ThreadLocal *self_data = (ThreadLocal *)opaque; > + Threads *threads = self_data->threads; > + void (*handler)(void *request) = threads->ops->thread_request_handler; > + ThreadRequest *request; > + > + for ( ; !atomic_read(&self_data->quit); ) { > + qemu_event_reset(&self_data->request_valid_ev); > + > + request = thread_busy_wait_for_request(self_data); > + if (!request) { > + qemu_event_wait(&self_data->request_valid_ev); > + continue; > + } > + > + assert(!request->done); > + > + handler(request + 1); > + request->done = true; > + mark_request_free(self_data, request); > + } > + > + return NULL; > +} > + > +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) > +{ > + Threads *threads = thread->threads; > + ThreadRequest *request = thread->requests; > + int i; > + > + for (i = 0; i < free_nr; i++) { > + threads->ops->thread_request_uninit(request + 1); > + request = (void *)request + threads->request_size; > + } > + g_free(thread->requests); > +} > + > +static int init_thread_requests(ThreadLocal *thread) > +{ > + Threads *threads = thread->threads; > + ThreadRequest *request; > + int ret, i, thread_reqs_size; > + > + thread_reqs_size = threads->thread_requests_nr * threads->request_size; > + thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES); > + thread->requests = g_malloc0(thread_reqs_size); > + > + request = thread->requests; > + for (i = 0; i < threads->thread_requests_nr; i++) { > + ret = threads->ops->thread_request_init(request + 1); > + if (ret < 0) { > + goto exit; > + } > + > + request->request_index = i; > + request->thread_index = thread->self; > + request = (void *)request + threads->request_size; > + } > + return 0; > + > +exit: > + uninit_thread_requests(thread, i); > + return -1; > +} > + > +static void uninit_thread_data(Threads *threads, int free_nr) > +{ > + ThreadLocal *thread_local = threads->per_thread_data; > + int i; > + > + for (i = 0; i < free_nr; i++) { > + thread_local[i].quit = true; > + qemu_event_set(&thread_local[i].request_valid_ev); > + qemu_thread_join(&thread_local[i].thread); > + qemu_event_destroy(&thread_local[i].request_valid_ev); > + qemu_event_destroy(&thread_local[i].request_free_ev); > + uninit_thread_requests(&thread_local[i], threads->thread_requests_nr); > + } > +} > + > +static int > +init_thread_data(Threads *threads, const char *thread_name, int thread_nr) > +{ > + ThreadLocal *thread_local = threads->per_thread_data; > + char *name; > + int i; > + > + for (i = 0; i < thread_nr; i++) { > + thread_local[i].threads = threads; > + thread_local[i].self = i; > + > + if (init_thread_requests(&thread_local[i]) < 0) { > + goto exit; > + } > + > + qemu_event_init(&thread_local[i].request_free_ev, false); > + qemu_event_init(&thread_local[i].request_valid_ev, false); > + > + name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self); > + qemu_thread_create(&thread_local[i].thread, name, > + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); > + g_free(name); > + } > + return 0; > + > +exit: > + uninit_thread_data(threads, i); > + return -1; > +} > + > +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, > + unsigned int thread_requests_nr, > + const ThreadedWorkqueueOps *ops) > +{ > + Threads *threads; > + > + if (threads_nr > MAX_THREAD_REQUEST_NR) { > + return NULL; > + } > + > + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); > + threads->ops = ops; > + threads->threads_nr = threads_nr; > + threads->thread_requests_nr = thread_requests_nr; > + > + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); > + threads->request_size = threads->ops->request_size; > + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); > + threads->request_size += sizeof(ThreadRequest); > + > + if (init_thread_data(threads, name, threads_nr) < 0) { > + g_free(threads); > + return NULL; > + } > + > + return threads; > +} > + > +void threaded_workqueue_destroy(Threads *threads) > +{ > + uninit_thread_data(threads, threads->threads_nr); > + g_free(threads); > +} > + > +static void request_done(Threads *threads, ThreadRequest *request) > +{ > + if (!request->done) { > + return; > + } > + > + threads->ops->thread_request_done(request + 1); > + request->done = false; > +} > + > +void *threaded_workqueue_get_request(Threads *threads) > +{ > + ThreadRequest *request; > + > + request = threads_find_free_request(threads); > + if (!request) { > + return NULL; > + } > + > + request_done(threads, request); > + return request + 1; > +} > + > +void threaded_workqueue_submit_request(Threads *threads, void *request) > +{ > + ThreadRequest *req = request - sizeof(ThreadRequest); > + int thread_index = request_to_thread_index(request); > + > + assert(!req->done); > + mark_request_valid(threads, req); > + threads->current_thread_index = thread_index + 1; > +} > + > +void threaded_workqueue_wait_for_requests(Threads *threads) > +{ > + ThreadLocal *thread; > + uint64_t result_bitmap; > + int thread_index, index = 0; > + > + for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) { > + thread = threads->per_thread_data + thread_index; > + index = 0; > +retry: > + qemu_event_reset(&thread->request_free_ev); > + result_bitmap = get_free_request_bitmap(threads, thread); > + > + for (; index < threads->thread_requests_nr; index++) { > + if (test_bit(index, &result_bitmap)) { > + qemu_event_wait(&thread->request_free_ev); > + goto retry; > + } > + > + request_done(threads, index_to_request(thread, index)); > + } > + } > +} > -- > 2.14.5 > -- Dr. David Alan Gilbert / dgilbert@xxxxxxxxxx / Manchester, UK