From: Yucong Sun <sunyucong@xxxxxxxxx> This patch adds "-j" mode to test_progs, executing tests in multiple process. "-j" mode is optional, and works with all existing test selection mechanism, as well as "-v", "-l" etc. In "-j" mode, main process use UDS/DGRAM to communicate to each forked worker, commanding it to run tests and collect logs. After all tests are finished, a summary is printed. main process use multiple competing threads to dispatch work to worker, trying to keep them all busy. Example output: > ./test_progs -n 15-20 -j [ 8.584709] bpf_testmod: loading out-of-tree module taints kernel. Launching 2 workers. [0]: Running test 15. [1]: Running test 16. [1]: Running test 17. [1]: Running test 18. [1]: Running test 19. [1]: Running test 20. [1]: worker exit. [0]: worker exit. #15 btf_dump:OK #16 btf_endian:OK #17 btf_map_in_map:OK #18 btf_module:OK #19 btf_skc_cls_ingress:OK #20 btf_split:OK Summary: 6/20 PASSED, 0 SKIPPED, 0 FAILED Know issue: Some tests fail when running concurrently, later patch will either fix the test or pin them to worker 0. Signed-off-by: Yucong Sun <sunyucong@xxxxxxxxx> V4 -> V3: address style warnings. V3 -> V2: fix missing outputs in commit messages. V2 -> V1: switch to UDS client/server model. --- tools/testing/selftests/bpf/test_progs.c | 458 ++++++++++++++++++++++- tools/testing/selftests/bpf/test_progs.h | 36 +- 2 files changed, 480 insertions(+), 14 deletions(-) diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c index 2ed01f615d20..c542e2d2f893 100644 --- a/tools/testing/selftests/bpf/test_progs.c +++ b/tools/testing/selftests/bpf/test_progs.c @@ -12,6 +12,11 @@ #include <string.h> #include <execinfo.h> /* backtrace */ #include <linux/membarrier.h> +#include <sys/sysinfo.h> /* get_nprocs */ +#include <netinet/in.h> +#include <sys/select.h> +#include <sys/socket.h> +#include <sys/un.h> /* Adapted from perf/util/string.c */ static bool glob_match(const char *str, const char *pat) @@ -48,6 +53,7 @@ struct prog_test_def { bool force_log; int error_cnt; int skip_cnt; + int sub_succ_cnt; bool tested; bool need_cgroup_cleanup; @@ -97,6 +103,10 @@ static void dump_test_log(const struct prog_test_def *test, bool failed) if (stdout == env.stdout) return; + /* worker always holds log */ + if (env.worker_index != -1) + return; + fflush(stdout); /* exports env.log_buf & env.log_cnt */ if (env.verbosity > VERBOSE_NONE || test->force_log || failed) { @@ -172,14 +182,14 @@ void test__end_subtest() dump_test_log(test, sub_error_cnt); - fprintf(env.stdout, "#%d/%d %s/%s:%s\n", + fprintf(stdout, "#%d/%d %s/%s:%s\n", test->test_num, test->subtest_num, test->test_name, test->subtest_name, sub_error_cnt ? "FAIL" : (test->skip_cnt ? "SKIP" : "OK")); if (sub_error_cnt) - env.fail_cnt++; + test->error_cnt++; else if (test->skip_cnt == 0) - env.sub_succ_cnt++; + test->sub_succ_cnt++; skip_account(); free(test->subtest_name); @@ -474,6 +484,7 @@ enum ARG_KEYS { ARG_LIST_TEST_NAMES = 'l', ARG_TEST_NAME_GLOB_ALLOWLIST = 'a', ARG_TEST_NAME_GLOB_DENYLIST = 'd', + ARG_NUM_WORKERS = 'j', }; static const struct argp_option opts[] = { @@ -495,6 +506,8 @@ static const struct argp_option opts[] = { "Run tests with name matching the pattern (supports '*' wildcard)." }, { "deny", ARG_TEST_NAME_GLOB_DENYLIST, "NAMES", 0, "Don't run tests with name matching the pattern (supports '*' wildcard)." }, + { "workers", ARG_NUM_WORKERS, "WORKERS", OPTION_ARG_OPTIONAL, + "Number of workers to run in parallel, default to number of cpus." }, {}, }; @@ -661,6 +674,17 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) case ARG_LIST_TEST_NAMES: env->list_test_names = true; break; + case ARG_NUM_WORKERS: + if (arg) { + env->workers = atoi(arg); + if (!env->workers) { + fprintf(stderr, "Invalid number of worker: %s.", arg); + return -1; + } + } else { + env->workers = get_nprocs(); + } + break; case ARGP_KEY_ARG: argp_usage(state); break; @@ -678,7 +702,7 @@ static void stdio_hijack(void) env.stdout = stdout; env.stderr = stderr; - if (env.verbosity > VERBOSE_NONE) { + if (env.verbosity > VERBOSE_NONE && env.worker_index == -1) { /* nothing to do, output to stdout by default */ return; } @@ -704,10 +728,6 @@ static void stdio_restore(void) return; fclose(stdout); - free(env.log_buf); - - env.log_buf = NULL; - env.log_cnt = 0; stdout = env.stdout; stderr = env.stderr; @@ -799,6 +819,366 @@ void crash_handler(int signum) backtrace_symbols_fd(bt, sz, STDERR_FILENO); } +int current_test_idx = 0; +pthread_mutex_t current_test_lock; + +struct test_result { + int error_cnt; + int skip_cnt; + int sub_succ_cnt; + + size_t log_cnt; + char *log_buf; +}; + +struct test_result test_results[ARRAY_SIZE(prog_test_defs)]; + +static inline const char *str_msg(const struct message *msg) +{ + static char buf[255]; + + switch (msg->type) { + case MSG_DO_TEST: + sprintf(buf, "MSG_DO_TEST %d", msg->u.message_do_test.num); + break; + case MSG_TEST_DONE: + sprintf(buf, "MSG_TEST_DONE %d (log: %d)", + msg->u.message_test_done.num, + msg->u.message_test_done.have_log); + break; + case MSG_TEST_LOG: + sprintf(buf, "MSG_TEST_LOG (cnt: %ld, last: %d)", + strlen(msg->u.message_test_log.log_buf), + msg->u.message_test_log.is_last); + break; + case MSG_EXIT: + sprintf(buf, "MSG_EXIT"); + break; + default: + sprintf(buf, "UNKNOWN"); + break; + } + + return buf; +} + +int send_message(int sock, const struct message *msg) +{ + if (env.verbosity > VERBOSE_NONE) + fprintf(stderr, "Sending msg: %s\n", str_msg(msg)); + return send(sock, msg, sizeof(*msg), 0); +} + +int recv_message(int sock, struct message *msg) +{ + int ret; + + memset(msg, 0, sizeof(*msg)); + ret = recv(sock, msg, sizeof(*msg), 0); + if (ret >= 0) { + if (env.verbosity > VERBOSE_NONE) + fprintf(stderr, "Received msg: %s\n", str_msg(msg)); + } + return ret; +} + +struct dispatch_data { + int idx; + int sock; +}; + +void *dispatch_thread(void *_data) +{ + struct dispatch_data *data; + int sock; + FILE *log_fd = NULL; + + data = (struct dispatch_data *)_data; + sock = data->sock; + + while (true) { + int test_to_run = -1; + struct prog_test_def *test; + struct test_result *result; + + /* grab a test */ + { + pthread_mutex_lock(¤t_test_lock); + + if (current_test_idx >= prog_test_cnt) { + pthread_mutex_unlock(¤t_test_lock); + goto done; + } + test_to_run = current_test_idx; + current_test_idx++; + + pthread_mutex_unlock(¤t_test_lock); + } + + test = &prog_test_defs[test_to_run]; + test->test_num = test_to_run + 1; + + if (!should_run(&env.test_selector, + test->test_num, test->test_name)) + continue; + + /* run test through worker */ + { + struct message msg_do_test; + + msg_do_test.type = MSG_DO_TEST; + msg_do_test.u.message_do_test.num = test_to_run; + if (send_message(sock, &msg_do_test) < 0) { + perror("Fail to send command"); + goto done; + } + env.worker_current_test[data->idx] = test_to_run; + } + + /* wait for test done */ + { + struct message msg_test_done; + + if (recv_message(sock, &msg_test_done) < 0) + goto error; + if (msg_test_done.type != MSG_TEST_DONE) + goto error; + if (test_to_run != msg_test_done.u.message_test_done.num) + goto error; + + result = &test_results[test_to_run]; + + test->tested = true; + + result->error_cnt = msg_test_done.u.message_test_done.error_cnt; + result->skip_cnt = msg_test_done.u.message_test_done.skip_cnt; + result->sub_succ_cnt = msg_test_done.u.message_test_done.sub_succ_cnt; + + /* collect all logs */ + if (msg_test_done.u.message_test_done.have_log) { + log_fd = open_memstream(&result->log_buf, &result->log_cnt); + if (!log_fd) + goto error; + + while (true) { + struct message msg_log; + + if (recv_message(sock, &msg_log) < 0) + goto error; + if (msg_log.type != MSG_TEST_LOG) + goto error; + + fprintf(log_fd, "%s", msg_log.u.message_test_log.log_buf); + if (msg_log.u.message_test_log.is_last) + break; + } + fclose(log_fd); + log_fd = NULL; + } + } + } /* while (true) */ +error: + fprintf(stderr, "[%d]: Protocol/IO error: %s", data->idx, strerror(errno)); + + if (log_fd) + fclose(log_fd); +done: + { + struct message msg_exit; + + msg_exit.type = MSG_EXIT; + if (send_message(sock, &msg_exit) < 0) + fprintf(stderr, "[%d]: send_message msg_exit: %s", data->idx, strerror(errno)); + } + return NULL; +} + +int server_main(void) +{ + pthread_t *dispatcher_threads; + struct dispatch_data *data; + + dispatcher_threads = calloc(sizeof(pthread_t), env.workers); + data = calloc(sizeof(struct dispatch_data), env.workers); + + env.worker_current_test = calloc(sizeof(int), env.workers); + for (int i = 0; i < env.workers; i++) { + int rc; + + data[i].idx = i; + data[i].sock = env.worker_socks[i]; + rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]); + if (rc < 0) { + perror("Failed to launch dispatcher thread"); + return -1; + } + } + + /* wait for all dispatcher to finish */ + for (int i = 0; i < env.workers; i++) { + while (true) { + struct timespec timeout = { + .tv_sec = time(NULL) + 5, + .tv_nsec = 0 + }; + if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT) + break; + fprintf(stderr, "Still waiting for thread %d (test %d).\n", i, env.worker_current_test[i] + 1); + } + } + free(dispatcher_threads); + free(env.worker_current_test); + free(data); + + /* generate summary */ + fflush(stderr); + fflush(stdout); + + for (int i = 0; i < prog_test_cnt; i++) { + struct prog_test_def *current_test; + struct test_result *result; + + current_test = &prog_test_defs[i]; + result = &test_results[i]; + + if (!current_test->tested) + continue; + + env.succ_cnt += result->error_cnt ? 0 : 1; + env.skip_cnt += result->skip_cnt; + env.fail_cnt += result->error_cnt; + env.sub_succ_cnt += result->sub_succ_cnt; + + if (result->log_cnt) { + result->log_buf[result->log_cnt] = '\0'; + fprintf(stdout, "%s", result->log_buf); + if (result->log_buf[result->log_cnt - 1] != '\n') + fprintf(stdout, "\n"); + } + fprintf(stdout, "#%d %s:%s\n", + current_test->test_num, current_test->test_name, + result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK")); + } + + fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n", + env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt); + + /* reap all workers */ + for (int i = 0; i < env.workers; i++) { + int wstatus, pid; + + pid = waitpid(env.worker_pids[i], &wstatus, 0); + assert(pid == env.worker_pids[i]); + } + + return 0; +} + +int worker_main(int sock) +{ + save_netns(); + + while (true) { + /* receive command */ + struct message msg; + + assert(recv_message(sock, &msg) >= 0); + + switch (msg.type) { + case MSG_EXIT: + fprintf(stderr, "[%d]: worker exit.\n", env.worker_index); + goto out; + case MSG_DO_TEST: { + int test_to_run; + struct prog_test_def *test; + struct message msg_done; + + test_to_run = msg.u.message_do_test.num; + + fprintf(stderr, "[%d]: Running test %d.\n", + env.worker_index, test_to_run + 1); + + test = &prog_test_defs[test_to_run]; + + env.test = test; + test->test_num = test_to_run + 1; + + stdio_hijack(); + + test->run_test(); + + /* ensure last sub-test is finalized properly */ + if (test->subtest_name) + test__end_subtest(); + + stdio_restore(); + + test->tested = true; + + skip_account(); + + reset_affinity(); + restore_netns(); + if (test->need_cgroup_cleanup) + cleanup_cgroup_environment(); + + memset(&msg_done, 0, sizeof(msg_done)); + msg_done.type = MSG_TEST_DONE; + msg_done.u.message_test_done.num = test_to_run; + msg_done.u.message_test_done.error_cnt = test->error_cnt; + msg_done.u.message_test_done.skip_cnt = test->skip_cnt; + msg_done.u.message_test_done.sub_succ_cnt = test->sub_succ_cnt; + msg_done.u.message_test_done.have_log = false; + + if (env.verbosity > VERBOSE_NONE || test->force_log || test->error_cnt) { + if (env.log_cnt) + msg_done.u.message_test_done.have_log = true; + } + assert(send_message(sock, &msg_done) >= 0); + + /* send logs */ + if (msg_done.u.message_test_done.have_log) { + char *src; + size_t slen; + + src = env.log_buf; + slen = env.log_cnt; + while (slen) { + struct message msg_log; + char *dest; + size_t len; + + memset(&msg_log, 0, sizeof(msg_log)); + msg_log.type = MSG_TEST_LOG; + dest = msg_log.u.message_test_log.log_buf; + len = slen >= MAX_LOG_TRUNK_SIZE ? MAX_LOG_TRUNK_SIZE : slen; + memcpy(dest, src, len); + + src += len; + slen -= len; + if (!slen) + msg_log.u.message_test_log.is_last = true; + + assert(send_message(sock, &msg_log) >= 0); + } + } + if (env.log_buf) { + free(env.log_buf); + env.log_buf = NULL; + env.log_cnt = 0; + } + break; + } /* case MSG_DO_TEST */ + default: + fprintf(stderr, "[%d]: unknown message.\n", env.worker_index); + return -1; + } + } +out: + restore_netns(); + return 0; +} + int main(int argc, char **argv) { static const struct argp argp = { @@ -837,13 +1217,57 @@ int main(int argc, char **argv) return -1; } - save_netns(); - stdio_hijack(); env.has_testmod = true; if (!env.list_test_names && load_bpf_testmod()) { fprintf(env.stderr, "WARNING! Selftests relying on bpf_testmod.ko will be skipped.\n"); env.has_testmod = false; } + + /* ignore workers if we are just listing */ + if (env.get_test_cnt || env.list_test_names) + env.workers = 0; + + /* launch workers if requested */ + env.worker_index = -1; /* main process */ + if (env.workers) { + env.worker_pids = calloc(sizeof(__pid_t), env.workers); + env.worker_socks = calloc(sizeof(int), env.workers); + fprintf(stdout, "Launching %d workers.\n", env.workers); + for (int i = 0; i < env.workers; i++) { + int sv[2]; + __pid_t pid; + + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sv) < 0) { + perror("Fail to create worker socket"); + return -1; + } + pid = fork(); + if (pid < 0) { + perror("Failed to fork worker"); + return -1; + } else if (pid != 0) { /* main process */ + close(sv[1]); + env.worker_pids[i] = pid; + env.worker_socks[i] = sv[0]; + } else { /* inside each worker process */ + close(sv[0]); + env.worker_index = i; + return worker_main(sv[1]); + } + } + + if (env.worker_index == -1) { + server_main(); + goto out; + } + } + + /* The rest of the main process */ + + /* on single mode */ + save_netns(); + stdio_hijack(); + for (i = 0; i < prog_test_cnt; i++) { struct prog_test_def *test = &prog_test_defs[i]; @@ -866,6 +1290,7 @@ int main(int argc, char **argv) } test->run_test(); + /* ensure last sub-test is finalized properly */ if (test->subtest_name) test__end_subtest(); @@ -882,16 +1307,21 @@ int main(int argc, char **argv) env.fail_cnt++; else env.succ_cnt++; + skip_account(); + env.sub_succ_cnt += test->sub_succ_cnt; reset_affinity(); restore_netns(); if (test->need_cgroup_cleanup) cleanup_cgroup_environment(); } - if (!env.list_test_names && env.has_testmod) - unload_bpf_testmod(); stdio_restore(); + if (env.log_buf) { + free(env.log_buf); + env.log_buf = NULL; + env.log_cnt = 0; + } if (env.get_test_cnt) { printf("%d\n", env.succ_cnt); @@ -904,14 +1334,16 @@ int main(int argc, char **argv) fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n", env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt); + close(env.saved_netns_fd); out: + if (!env.list_test_names && env.has_testmod) + unload_bpf_testmod(); free_str_set(&env.test_selector.blacklist); free_str_set(&env.test_selector.whitelist); free(env.test_selector.num_set); free_str_set(&env.subtest_selector.blacklist); free_str_set(&env.subtest_selector.whitelist); free(env.subtest_selector.num_set); - close(env.saved_netns_fd); if (env.succ_cnt + env.fail_cnt + env.skip_cnt == 0) return EXIT_NO_TEST; diff --git a/tools/testing/selftests/bpf/test_progs.h b/tools/testing/selftests/bpf/test_progs.h index 94bef0aa74cf..aadb44543225 100644 --- a/tools/testing/selftests/bpf/test_progs.h +++ b/tools/testing/selftests/bpf/test_progs.h @@ -69,7 +69,8 @@ struct test_env { bool get_test_cnt; bool list_test_names; - struct prog_test_def *test; + struct prog_test_def *test; /* current running tests */ + FILE *stdout; FILE *stderr; char *log_buf; @@ -82,6 +83,39 @@ struct test_env { int skip_cnt; /* skipped tests */ int saved_netns_fd; + int workers; /* number of worker process */ + int worker_index; /* index number of current worker, main process is -1 */ + __pid_t *worker_pids; /* array of worker pids */ + int *worker_socks; /* array of worker socks */ + int *worker_current_test; /* array of current running test for each worker */ +}; + +#define MAX_LOG_TRUNK_SIZE 8192 +enum message_type { + MSG_DO_TEST = 0, + MSG_TEST_DONE = 1, + MSG_TEST_LOG = 2, + MSG_EXIT = 255, +}; +struct message { + enum message_type type; + union { + struct { + int num; + } message_do_test; + struct { + int num; + + int sub_succ_cnt; + int error_cnt; + int skip_cnt; + bool have_log; + } message_test_done; + struct { + char log_buf[MAX_LOG_TRUNK_SIZE + 1]; + bool is_last; + } message_test_log; + } u; }; extern struct test_env env; -- 2.30.2