[RFC PATCH v0.1 9/9] selftests/umcg: add UMCG server/worker API selftest

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Add UMCG server/worker API selftests. These are only basic
tests, they do not cover many important use cases/conditions.

More to come.

Signed-off-by: Peter Oskolkov <posk@xxxxxxxxxx>
---
 tools/testing/selftests/umcg/.gitignore  |   1 +
 tools/testing/selftests/umcg/Makefile    |   4 +-
 tools/testing/selftests/umcg/umcg_test.c | 475 +++++++++++++++++++++++
 3 files changed, 479 insertions(+), 1 deletion(-)
 create mode 100644 tools/testing/selftests/umcg/umcg_test.c

diff --git a/tools/testing/selftests/umcg/.gitignore b/tools/testing/selftests/umcg/.gitignore
index 89cca24e5907..f488ec82882a 100644
--- a/tools/testing/selftests/umcg/.gitignore
+++ b/tools/testing/selftests/umcg/.gitignore
@@ -1,2 +1,3 @@
 # SPDX-License-Identifier: GPL-2.0-only
 umcg_core_test
+umcg_test
diff --git a/tools/testing/selftests/umcg/Makefile b/tools/testing/selftests/umcg/Makefile
index b151098e2ed1..916897d82e53 100644
--- a/tools/testing/selftests/umcg/Makefile
+++ b/tools/testing/selftests/umcg/Makefile
@@ -6,8 +6,10 @@ LIBUMCGDIR := $(TOOLSDIR)/lib/umcg
 CFLAGS += -g -O0 -I$(LIBUMCGDIR) -I$(TOOLSDIR)/include/ -I../../../../usr/include/
 LDLIBS += -lpthread -static
 
-TEST_GEN_PROGS := umcg_core_test
+TEST_GEN_PROGS := umcg_core_test umcg_test
 
 include ../lib.mk
 
 $(OUTPUT)/umcg_core_test: umcg_core_test.c $(LIBUMCGDIR)/libumcg.c
+
+$(OUTPUT)/umcg_test: umcg_test.c $(LIBUMCGDIR)/libumcg.c
diff --git a/tools/testing/selftests/umcg/umcg_test.c b/tools/testing/selftests/umcg/umcg_test.c
new file mode 100644
index 000000000000..2c01a61ec3f4
--- /dev/null
+++ b/tools/testing/selftests/umcg/umcg_test.c
@@ -0,0 +1,475 @@
+// SPDX-License-Identifier: GPL-2.0
+#define _GNU_SOURCE
+#include "libumcg.h"
+
+#include <pthread.h>
+#include <stdatomic.h>
+
+#include "../kselftest_harness.h"
+
+#define CHECK_CONFIG()						\
+{								\
+	int ret = sys_umcg_api_version(1, 0);	\
+								\
+	if (ret == -1 && errno == ENOSYS)			\
+		SKIP(return, "CONFIG_UMCG not set");	\
+}
+
+struct worker_args {
+	umcg_t		group;  /* Which group the worker should join. */
+	umcg_tid	utid;   /* This worker's utid. */
+	void *(*thread_fn)(void *);  /* Function to run. */
+	void		*thread_arg;
+	intptr_t	tag;
+};
+
+static void validate_state(umcg_tid utid, u32 expected, const char *ctx)
+{
+	u32 state = umcg_get_task_state(utid);
+
+	if (state == expected)
+		return;
+
+	fprintf(stderr, "BAD state for %ld: expected: %u; got: %u; ctx :%s\n",
+			utid, expected, state, ctx);
+	exit(1);
+}
+
+static void *worker_fn(void *arg)
+{
+	void *result;
+	umcg_tid utid;
+	struct worker_args *args = (struct worker_args *)arg;
+
+	validate_state(umcg_get_utid(), UMCG_TASK_NONE, "worker_fn start");
+
+	atomic_thread_fence(memory_order_acquire);
+	atomic_store_explicit(&args->utid, umcg_get_utid(),
+			memory_order_seq_cst);
+
+	utid = umcg_register_worker(args->group, args->tag);
+	if (args->utid != utid) {
+		fprintf(stderr, "umcg_register_worker failed.\n");
+		exit(1);
+	}
+	validate_state(umcg_get_utid(), UMCG_TASK_RUNNING, "worker_fn in");
+
+	/* Fence args->thread_arg */
+	atomic_thread_fence(memory_order_acquire);
+
+	result = args->thread_fn(args->thread_arg);
+	validate_state(umcg_get_utid(), UMCG_TASK_RUNNING, "worker_fn out");
+
+	if (umcg_unregister_task()) {
+		fprintf(stderr, "umcg_unregister_task failed.\n");
+		exit(1);
+	}
+	validate_state(umcg_get_utid(), UMCG_TASK_NONE, "worker_fn finish");
+
+	return result;
+}
+
+static void *simple_running_worker(void *arg)
+{
+	bool *checkpoint = (bool *)arg;
+
+	atomic_store_explicit(checkpoint, true, memory_order_relaxed);
+	return NULL;
+}
+
+TEST(umcg_poll_run_test) {
+	pthread_t worker;
+	bool checkpoint = false;
+	struct worker_args worker_args;
+
+	CHECK_CONFIG();
+
+	worker_args.utid = UMCG_NONE;
+	worker_args.group = umcg_create_group(0);
+	ASSERT_NE(UMCG_NONE, worker_args.group);
+
+	worker_args.thread_fn = &simple_running_worker;
+	worker_args.thread_arg = &checkpoint;
+	worker_args.tag = 0;
+
+	ASSERT_EQ(0, pthread_create(&worker, NULL, &worker_fn, &worker_args));
+
+	/* Wait for the worker to start. */
+	while (UMCG_NONE == atomic_load_explicit(&worker_args.utid,
+				memory_order_relaxed))
+		;
+
+	/*
+	 * Make sure that the worker does not checkpoint until the server
+	 * runs it.
+	 */
+	usleep(1000);
+	ASSERT_FALSE(atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	ASSERT_NE(0, umcg_register_server(worker_args.group, 0));
+
+	/*
+	 * Run the worker until it exits. Need to loop because the worker
+	 * may pagefault and wake the server.
+	 */
+	do {
+		u32 state;
+
+		/* Poll the worker. */
+		ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+		validate_state(worker_args.utid, UMCG_TASK_RUNNABLE, "wns poll");
+
+		umcg_tid utid = umcg_run_worker(worker_args.utid);
+		if (utid == UMCG_NONE) {
+			ASSERT_EQ(0, errno);
+			break;
+		}
+
+		ASSERT_EQ(utid, worker_args.utid);
+
+		state = umcg_get_task_state(utid);
+		ASSERT_TRUE(state == UMCG_TASK_BLOCKED || UMCG_TASK_UNBLOCKED);
+	} while (true);
+
+	ASSERT_TRUE(atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	/* Can't destroy group while this thread still belongs to it. */
+	ASSERT_NE(0, umcg_destroy_group(worker_args.group));
+	ASSERT_EQ(0, umcg_unregister_task());
+	ASSERT_EQ(0, umcg_destroy_group(worker_args.group));
+	ASSERT_EQ(0, pthread_join(worker, NULL));
+}
+
+static void *sleeping_worker(void *arg)
+{
+	int *checkpoint = (int *)arg;
+
+	atomic_store_explicit(checkpoint, 1, memory_order_relaxed);
+	usleep(2000);
+	atomic_store_explicit(checkpoint, 2, memory_order_relaxed);
+
+	return NULL;
+}
+
+TEST(umcg_sleep_test) {
+	pthread_t worker;
+	u32 state;
+	int checkpoint = 0;
+	struct worker_args worker_args;
+
+	CHECK_CONFIG();
+
+	worker_args.utid = UMCG_NONE;
+	worker_args.group = umcg_create_group(0);
+	ASSERT_NE(UMCG_NONE, worker_args.group);
+
+	worker_args.thread_fn = &sleeping_worker;
+	worker_args.thread_arg = &checkpoint;
+	worker_args.tag = 0;
+
+	ASSERT_EQ(0, pthread_create(&worker, NULL, &worker_fn, &worker_args));
+
+	/* Wait for the worker to start. */
+	while (UMCG_NONE == atomic_load_explicit(&worker_args.utid,
+				memory_order_relaxed))
+		;
+
+	/*
+	 * Make sure that the worker does not checkpoint until the server
+	 * runs it.
+	 */
+	usleep(1000);
+	ASSERT_EQ(0, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	validate_state(umcg_get_utid(), UMCG_TASK_NONE, "sws prereg");
+
+	ASSERT_NE(0, umcg_register_server(worker_args.group, 0));
+
+	validate_state(umcg_get_utid(), UMCG_TASK_PROCESSING, "sws postreg");
+
+	/*
+	 * Run the worker until it checkpoints 1. Need to loop because
+	 * the worker may pagefault and wake the server.
+	 */
+	do {
+		ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+		validate_state(worker_args.utid, UMCG_TASK_RUNNABLE,
+				"sws poll");
+
+		umcg_tid utid = umcg_run_worker(worker_args.utid);
+		ASSERT_EQ(utid, worker_args.utid);
+	} while (1 != atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	state = umcg_get_task_state(worker_args.utid);
+	ASSERT_TRUE(state == UMCG_TASK_BLOCKED || UMCG_TASK_UNBLOCKED);
+	validate_state(umcg_get_utid(), UMCG_TASK_PROCESSING, "sws mid");
+
+	/* The worker cannot reach checkpoint 2 without the server running it. */
+	usleep(2000);
+	ASSERT_EQ(1, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	state = umcg_get_task_state(worker_args.utid);
+	ASSERT_TRUE(state == UMCG_TASK_BLOCKED || UMCG_TASK_UNBLOCKED);
+
+	/* Run the worker until it exits. */
+	do {
+		ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+		umcg_tid utid = umcg_run_worker(worker_args.utid);
+		if (utid == UMCG_NONE) {
+			ASSERT_EQ(0, errno);
+			break;
+		}
+
+		ASSERT_EQ(utid, worker_args.utid);
+	} while (true);
+
+	/* The final check and cleanup. */
+	ASSERT_EQ(2, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+	validate_state(umcg_get_utid(), UMCG_TASK_PROCESSING, "sws preunreg");
+	ASSERT_EQ(0, pthread_join(worker, NULL));
+	ASSERT_EQ(0, umcg_unregister_task());
+	validate_state(umcg_get_utid(), UMCG_TASK_NONE, "sws postunreg");
+	ASSERT_EQ(0, umcg_destroy_group(worker_args.group));
+}
+
+static void *waiting_worker(void *arg)
+{
+	int *checkpoint = (int *)arg;
+
+	atomic_store_explicit(checkpoint, 1, memory_order_relaxed);
+	if (umcg_wait(NULL)) {
+		fprintf(stderr, "umcg_wait() failed.\n");
+		exit(1);
+	}
+	atomic_store_explicit(checkpoint, 2, memory_order_relaxed);
+
+	return NULL;
+}
+
+TEST(umcg_wait_wake_test) {
+	pthread_t worker;
+	int checkpoint = 0;
+	struct worker_args worker_args;
+
+	CHECK_CONFIG();
+
+	worker_args.utid = UMCG_NONE;
+	worker_args.group = umcg_create_group(0);
+	ASSERT_NE(UMCG_NONE, worker_args.group);
+
+	worker_args.thread_fn = &waiting_worker;
+	worker_args.thread_arg = &checkpoint;
+	worker_args.tag = 0;
+
+	ASSERT_EQ(0, pthread_create(&worker, NULL, &worker_fn, &worker_args));
+
+	/* Wait for the worker to start. */
+	while (UMCG_NONE == atomic_load_explicit(&worker_args.utid,
+				memory_order_relaxed))
+		;
+
+	/*
+	 * Make sure that the worker does not checkpoint until the server
+	 * runs it.
+	 */
+	usleep(1000);
+	ASSERT_EQ(0, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	ASSERT_NE(0, umcg_register_server(worker_args.group, 0));
+
+	/*
+	 * Run the worker until it checkpoints 1. Need to loop because
+	 * the worker may pagefault and wake the server.
+	 */
+	do {
+		ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+		ASSERT_EQ(worker_args.utid, umcg_run_worker(worker_args.utid));
+	} while (1 != atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	validate_state(worker_args.utid, UMCG_TASK_RUNNABLE, "wait_wake wait");
+
+	/* The worker cannot reach checkpoint 2 without the server waking it. */
+	usleep(2000);
+	ASSERT_EQ(1, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+	validate_state(worker_args.utid, UMCG_TASK_RUNNABLE, "wait_wake wait");
+
+
+	ASSERT_EQ(0, umcg_wake(worker_args.utid));
+
+	/*
+	 * umcg_wake() above marks the worker as RUNNING; it will become
+	 * UNBLOCKED upon wakeup as it does not have a server. But this may
+	 * be delayed.
+	 */
+	while (umcg_get_task_state(worker_args.utid) != UMCG_TASK_UNBLOCKED)
+		;
+
+	/* The worker cannot reach checkpoint 2 without the server running it. */
+	usleep(2000);
+	ASSERT_EQ(1, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+	/* Run the worker until it exits. */
+	do {
+		ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+		umcg_tid utid = umcg_run_worker(worker_args.utid);
+		if (utid == UMCG_NONE) {
+			ASSERT_EQ(0, errno);
+			break;
+		}
+
+		ASSERT_EQ(utid, worker_args.utid);
+	} while (true);
+
+	/* The final check and cleanup. */
+	ASSERT_EQ(2, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+	ASSERT_EQ(0, pthread_join(worker, NULL));
+	ASSERT_EQ(0, umcg_unregister_task());
+	ASSERT_EQ(0, umcg_destroy_group(worker_args.group));
+}
+
+static void *swapping_worker(void *arg)
+{
+	umcg_tid next;
+
+	atomic_thread_fence(memory_order_acquire);
+	next = (umcg_tid)arg;
+
+	if (next == UMCG_NONE) {
+		if (0 != umcg_wait(NULL)) {
+			fprintf(stderr, "swapping_worker: umcg_wait failed\n");
+			exit(1);
+		}
+	} else {
+		if (0 != umcg_swap(next, NULL)) {
+			fprintf(stderr, "swapping_worker: umcg_swap failed\n");
+			exit(1);
+		}
+	}
+
+	return NULL;
+}
+
+TEST(umcg_swap_test) {
+	const int n_workers = 10;
+	struct worker_args *worker_args;
+	int swap_chain_wakeups = 0;
+	umcg_tid utid = UMCG_NONE;
+	bool *workers_polled;
+	pthread_t *workers;
+	umcg_t group_id;
+	int idx;
+
+	CHECK_CONFIG();
+
+	group_id = umcg_create_group(0);
+	ASSERT_NE(UMCG_NONE, group_id);
+
+	workers = malloc(n_workers * sizeof(pthread_t));
+	worker_args = malloc(n_workers * sizeof(struct worker_args));
+	workers_polled = malloc(n_workers * sizeof(bool));
+	if (!workers || !worker_args || !workers_polled) {
+		fprintf(stderr, "malloc failed\n");
+		exit(1);
+	}
+
+	memset(worker_args, 0, n_workers * sizeof(struct worker_args));
+
+	/* Start workers. All will block in umcg_register_worker(). */
+	for (idx = 0; idx < n_workers; ++idx) {
+		workers_polled[idx] = false;
+
+		worker_args[idx].group = group_id;
+		worker_args[idx].thread_fn = &swapping_worker;
+		worker_args[idx].tag = idx;
+		atomic_thread_fence(memory_order_release);
+
+		ASSERT_EQ(0, pthread_create(&workers[idx], NULL, &worker_fn,
+					&worker_args[idx]));
+	}
+
+	/* Wait for all workers to update their utids. */
+	for (idx = 0; idx < n_workers; ++idx) {
+		uint64_t counter = 0;
+		while (UMCG_NONE == atomic_load_explicit(&worker_args[idx].utid,
+					memory_order_seq_cst)) {
+			++counter;
+			if (!(counter % 1000000))
+				fprintf(stderr, "looping for utid: %d %lu\n",
+						idx, counter);
+		}
+	}
+
+	/* Update worker args. */
+	for (idx = 0; idx < (n_workers - 1); ++idx) {
+		worker_args[idx].thread_arg = (void *)worker_args[idx + 1].utid;
+	}
+	atomic_thread_fence(memory_order_release);
+
+	ASSERT_NE(0, umcg_register_server(group_id, 0));
+
+	/* Poll workers. */
+	for (idx = 0; idx < n_workers; ++idx) {
+		utid = umcg_poll_worker();
+
+		ASSERT_NE(UMCG_NONE, utid);
+		workers_polled[umcg_get_task_tag(utid)] = true;
+
+		validate_state(utid, UMCG_TASK_RUNNABLE, "swap poll");
+	}
+
+	/* Check that all workers have been polled. */
+	for (idx = 0; idx < n_workers; ++idx) {
+		ASSERT_TRUE(workers_polled[idx]);
+	}
+
+	/* Run the first worker; the swap chain will lead to the last worker. */
+	utid = worker_args[0].utid;
+	idx = 0;
+	do {
+		uint32_t state;
+
+		utid = umcg_run_worker(utid);
+		if (utid == worker_args[n_workers - 1].utid &&
+				umcg_get_task_state(utid) == UMCG_TASK_RUNNABLE)
+			break;
+
+		/* There can be an occasional mid-swap wakeup due to pagefault. */
+		++swap_chain_wakeups;
+
+		/* Validate progression. */
+		ASSERT_GE(umcg_get_task_tag(utid), idx);
+		idx = umcg_get_task_tag(utid);
+
+		/* Validate state. */
+		state = umcg_get_task_state(utid);
+		ASSERT_TRUE(state == UMCG_TASK_BLOCKED ||
+				state == UMCG_TASK_UNBLOCKED);
+
+		ASSERT_EQ(utid, umcg_poll_worker());
+	} while (true);
+
+	ASSERT_LT(swap_chain_wakeups, 4);
+	if (swap_chain_wakeups)
+		fprintf(stderr, "WARNING: %d swap chain wakeups\n",
+				swap_chain_wakeups);
+
+	/* Finally run/release all workers. */
+	for (idx = 0; idx < n_workers; ++idx) {
+		utid = worker_args[idx].utid;
+		do {
+			utid = umcg_run_worker(utid);
+			if (utid) {
+				ASSERT_EQ(utid, worker_args[idx].utid);
+				ASSERT_EQ(utid, umcg_poll_worker());
+			}
+		} while (utid != UMCG_NONE);
+	}
+
+	/* Cleanup. */
+	for (idx = 0; idx < n_workers; ++idx)
+		ASSERT_EQ(0, pthread_join(workers[idx], NULL));
+	ASSERT_EQ(0, umcg_unregister_task());
+	ASSERT_EQ(0, umcg_destroy_group(group_id));
+}
+
+TEST_HARNESS_MAIN
-- 
2.31.1.818.g46aad6cb9e-goog




[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux