@@ -12,6 +12,11 @@
#include <string.h>
#include <execinfo.h> /* backtrace */
#include <linux/membarrier.h>
+#include <sys/sysinfo.h> /* get_nprocs */
+#include <netinet/in.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <sys/un.h>
/* Adapted from perf/util/string.c */
static bool glob_match(const char *str, const char *pat)
@@ -48,6 +53,8 @@ struct prog_test_def {
bool force_log;
int error_cnt;
int skip_cnt;
+ int sub_succ_cnt;
+ bool should_run;
bool tested;
bool need_cgroup_cleanup;
@@ -97,6 +104,10 @@ static void dump_test_log(const struct prog_test_def *test, bool failed)
if (stdout == env.stdout)
return;
+ /* worker always holds log */
+ if (env.worker_id != -1)
+ return;
+
fflush(stdout); /* exports env.log_buf & env.log_cnt */
if (env.verbosity > VERBOSE_NONE || test->force_log || failed) {
@@ -107,8 +118,6 @@ static void dump_test_log(const struct prog_test_def *test, bool failed)
fprintf(env.stdout, "\n");
}
}
-
- fseeko(stdout, 0, SEEK_SET); /* rewind */
}
static void skip_account(void)
@@ -172,14 +181,14 @@ void test__end_subtest()
dump_test_log(test, sub_error_cnt);
- fprintf(env.stdout, "#%d/%d %s/%s:%s\n",
+ fprintf(stdout, "#%d/%d %s/%s:%s\n",
test->test_num, test->subtest_num, test->test_name, test->subtest_name,
sub_error_cnt ? "FAIL" : (test->skip_cnt ? "SKIP" : "OK"));
if (sub_error_cnt)
- env.fail_cnt++;
+ test->error_cnt++;
else if (test->skip_cnt == 0)
- env.sub_succ_cnt++;
+ test->sub_succ_cnt++;
skip_account();
free(test->subtest_name);
@@ -474,6 +483,8 @@ enum ARG_KEYS {
ARG_LIST_TEST_NAMES = 'l',
ARG_TEST_NAME_GLOB_ALLOWLIST = 'a',
ARG_TEST_NAME_GLOB_DENYLIST = 'd',
+ ARG_NUM_WORKERS = 'j',
+ ARG_DEBUG = -1,
};
static const struct argp_option opts[] = {
@@ -495,6 +506,10 @@ static const struct argp_option opts[] = {
"Run tests with name matching the pattern (supports '*' wildcard)." },
{ "deny", ARG_TEST_NAME_GLOB_DENYLIST, "NAMES", 0,
"Don't run tests with name matching the pattern (supports '*' wildcard)." },
+ { "workers", ARG_NUM_WORKERS, "WORKERS", OPTION_ARG_OPTIONAL,
+ "Number of workers to run in parallel, default to number of cpus." },
+ { "debug", ARG_DEBUG, NULL, 0,
+ "print extra debug information for test_progs." },
{},
};
@@ -650,7 +665,7 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
fprintf(stderr,
"Unable to setenv SELFTESTS_VERBOSE=1 (errno=%d)",
errno);
- return -1;
+ return -EINVAL;
}
}
@@ -661,6 +676,20 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
case ARG_LIST_TEST_NAMES:
env->list_test_names = true;
break;
+ case ARG_NUM_WORKERS:
+ if (arg) {
+ env->workers = atoi(arg);
+ if (!env->workers) {
+ fprintf(stderr, "Invalid number of worker: %s.", arg);
+ return -EINVAL;
+ }
+ } else {
+ env->workers = get_nprocs();
+ }
+ break;
+ case ARG_DEBUG:
+ env->debug = true;
+ break;
case ARGP_KEY_ARG:
argp_usage(state);
break;
@@ -678,7 +707,7 @@ static void stdio_hijack(void)
env.stdout = stdout;
env.stderr = stderr;
- if (env.verbosity > VERBOSE_NONE) {
+ if (env.verbosity > VERBOSE_NONE && env.worker_id == -1) {
/* nothing to do, output to stdout by default */
return;
}
@@ -704,10 +733,6 @@ static void stdio_restore(void)
return;
fclose(stdout);
- free(env.log_buf);
-
- env.log_buf = NULL;
- env.log_cnt = 0;
stdout = env.stdout;
stderr = env.stderr;
@@ -794,11 +819,456 @@ void crash_handler(int signum)
dump_test_log(env.test, true);
if (env.stdout)
stdio_restore();
-
+ if (env.worker_id != -1)
+ fprintf(stderr, "[%d]: ", env.worker_id);
fprintf(stderr, "Caught signal #%d!\nStack trace:\n", signum);
backtrace_symbols_fd(bt, sz, STDERR_FILENO);
}
+void sigint_handler(int signum) {
+ for (int i = 0; i < env.workers; i++)
+ if (env.worker_socks[i] > 0)
+ close(env.worker_socks[i]);
+}
+
+static int current_test_idx;
+static pthread_mutex_t current_test_lock;
+static pthread_mutex_t stdout_output_lock;
+
+struct test_result {
+ int error_cnt;
+ int skip_cnt;
+ int sub_succ_cnt;
+
+ size_t log_cnt;
+ char *log_buf;
+};
+
+static struct test_result test_results[ARRAY_SIZE(prog_test_defs)];
+
+static inline const char *str_msg(const struct msg *msg, char *buf)
+{
+ switch (msg->type) {
+ case MSG_DO_TEST:
+ sprintf(buf, "MSG_DO_TEST %d", msg->do_test.test_num);
+ break;
+ case MSG_TEST_DONE:
+ sprintf(buf, "MSG_TEST_DONE %d (log: %d)",
+ msg->test_done.test_num,
+ msg->test_done.have_log);
+ break;
+ case MSG_TEST_LOG:
+ sprintf(buf, "MSG_TEST_LOG (cnt: %ld, last: %d)",
+ strlen(msg->test_log.log_buf),
+ msg->test_log.is_last);
+ break;
+ case MSG_EXIT:
+ sprintf(buf, "MSG_EXIT");
+ break;
+ default:
+ sprintf(buf, "UNKNOWN");
+ break;
+ }
+
+ return buf;
+}
+
+static int send_message(int sock, const struct msg *msg)
+{
+ char buf[256];
+
+ if (env.debug)
+ fprintf(stderr, "Sending msg: %s\n", str_msg(msg, buf));
+ return send(sock, msg, sizeof(*msg), 0);
+}
+
+static int recv_message(int sock, struct msg *msg)
+{
+ int ret;
+ char buf[256];
+
+ memset(msg, 0, sizeof(*msg));
+ ret = recv(sock, msg, sizeof(*msg), 0);
+ if (ret >= 0) {
+ if (env.debug)
+ fprintf(stderr, "Received msg: %s\n", str_msg(msg, buf));
+ }
+ return ret;
+}
+
+static void run_one_test(int test_num)
+{
+ struct prog_test_def *test = &prog_test_defs[test_num];
+
+ env.test = test;
+
+ test->run_test();
+
+ /* ensure last sub-test is finalized properly */
+ if (test->subtest_name)
+ test__end_subtest();
+
+ test->tested = true;
+
+ dump_test_log(test, test->error_cnt);
+
+ reset_affinity();
+ restore_netns();
+ if (test->need_cgroup_cleanup)
+ cleanup_cgroup_environment();
+}
+
+struct dispatch_data {
+ int worker_id;
+ int sock_fd;
+};
+
+void *dispatch_thread(void *ctx)
+{
+ struct dispatch_data *data = ctx;
+ int sock_fd;
+ FILE *log_fd = NULL;
+
+ sock_fd = data->sock_fd;
+
+ while (true) {
+ int test_to_run = -1;
+ struct prog_test_def *test;
+ struct test_result *result;
+
+ /* grab a test */
+ {
+ pthread_mutex_lock(¤t_test_lock);
+
+ if (current_test_idx >= prog_test_cnt) {
+ pthread_mutex_unlock(¤t_test_lock);
+ goto done;
+ }
+
+ test = &prog_test_defs[current_test_idx];
+ test_to_run = current_test_idx;
+ current_test_idx++;
+
+ pthread_mutex_unlock(¤t_test_lock);
+ }
+
+ if (!test->should_run)
+ continue;
+
+ /* run test through worker */
+ {
+ struct msg msg_do_test;
+
+ msg_do_test.type = MSG_DO_TEST;
+ msg_do_test.do_test.test_num = test_to_run;
+ if (send_message(sock_fd, &msg_do_test) < 0) {
+ perror("Fail to send command");
+ goto done;
+ }
+ env.worker_current_test[data->worker_id] = test_to_run;
+ }
+
+ /* wait for test done */
+ {
+ int err;
+ struct msg msg_test_done;
+
+ err = recv_message(sock_fd, &msg_test_done);
+ if (err < 0)
+ goto error;
+ if (msg_test_done.type != MSG_TEST_DONE)
+ goto error;
+ if (test_to_run != msg_test_done.test_done.test_num)
+ goto error;
+
+ test->tested = true;
+ result = &test_results[test_to_run];
+
+ result->error_cnt = msg_test_done.test_done.error_cnt;
+ result->skip_cnt = msg_test_done.test_done.skip_cnt;
+ result->sub_succ_cnt = msg_test_done.test_done.sub_succ_cnt;
+
+ /* collect all logs */
+ if (msg_test_done.test_done.have_log) {
+ log_fd = open_memstream(&result->log_buf, &result->log_cnt);
+ if (!log_fd)
+ goto error;
+
+ while (true) {
+ struct msg msg_log;
+
+ if (recv_message(sock_fd, &msg_log) < 0)
+ goto error;
+ if (msg_log.type != MSG_TEST_LOG)
+ goto error;
+
+ fprintf(log_fd, "%s", msg_log.test_log.log_buf);
+ if (msg_log.test_log.is_last)
+ break;
+ }
+ fclose(log_fd);
+ log_fd = NULL;
+ }
+ /* output log */
+ {
+ pthread_mutex_lock(&stdout_output_lock);
+
+ if (result->log_cnt) {
+ result->log_buf[result->log_cnt] = '\0';
+ fprintf(stdout, "%s", result->log_buf);
+ if (result->log_buf[result->log_cnt - 1] != '\n')
+ fprintf(stdout, "\n");
+ }
+
+ fprintf(stdout, "#%d %s:%s\n",
+ test->test_num, test->test_name,
+ result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK"));
+
+ pthread_mutex_unlock(&stdout_output_lock);
+ }
+
+ } /* wait for test done */
+ } /* while (true) */
+error:
+ if (env.debug)
+ fprintf(stderr, "[%d]: Protocol/IO error: %s.\n", data->worker_id, strerror(errno));
+
+ if (log_fd)
+ fclose(log_fd);
+done:
+ {
+ struct msg msg_exit;
+
+ msg_exit.type = MSG_EXIT;
+ if (send_message(sock_fd, &msg_exit) < 0) {
+ if (env.debug)
+ fprintf(stderr, "[%d]: send_message msg_exit: %s.\n",
+ data->worker_id, strerror(errno));
+ }
+ }
+ return NULL;
+}
+
+static void print_all_error_logs(void)
+{
+ if (env.fail_cnt)
+ fprintf(stdout, "\nAll error logs:\n");
+
+ /* print error logs again */
+
+ for (int i = 0; i < prog_test_cnt; i++) {
+ struct prog_test_def *test;
+ struct test_result *result;
+
+ test = &prog_test_defs[i];
+ result = &test_results[i];
+
+ if (!test->tested || !result->error_cnt)
+ continue;
+
+ fprintf(stdout, "\n#%d %s:%s\n",
+ test->test_num, test->test_name,
+ result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK"));
+
+ if (result->log_cnt) {
+ result->log_buf[result->log_cnt] = '\0';
+ fprintf(stdout, "%s", result->log_buf);
+ if (result->log_buf[result->log_cnt - 1] != '\n')
+ fprintf(stdout, "\n");
+ }
+ }
+}
+
+static int server_main(void)
+{
+ pthread_t *dispatcher_threads;
+ struct dispatch_data *data;
+ struct sigaction sigact_int = {
+ .sa_handler = sigint_handler,
+ .sa_flags = SA_RESETHAND,
+ };
+
+ sigaction(SIGINT, &sigact_int, NULL);
+
+ dispatcher_threads = calloc(sizeof(pthread_t), env.workers);
+ data = calloc(sizeof(struct dispatch_data), env.workers);
+
+ env.worker_current_test = calloc(sizeof(int), env.workers);
+ for (int i = 0; i < env.workers; i++) {
+ int rc;
+
+ data[i].worker_id = i;
+ data[i].sock_fd = env.worker_socks[i];
+ rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]);
+ if (rc < 0) {
+ perror("Failed to launch dispatcher thread");
+ exit(EXIT_ERR_SETUP_INFRA);
+ }
+ }
+
+ /* wait for all dispatcher to finish */
+ for (int i = 0; i < env.workers; i++) {
+ while (true) {
+ int ret = pthread_tryjoin_np(dispatcher_threads[i], NULL);
+
+ if (!ret) {
+ break;
+ } else if (ret == EBUSY) {
+ if (env.debug)
+ fprintf(stderr, "Still waiting for thread %d (test %d).\n",
+ i, env.worker_current_test[i] + 1);
+ usleep(1000 * 1000);
+ continue;
+ } else {
+ fprintf(stderr, "Unexpected error joining dispatcher thread: %d", ret);
+ break;
+ }
+ }
+ }
+ free(dispatcher_threads);
+ free(env.worker_current_test);
+ free(data);
+
+ /* generate summary */
+ fflush(stderr);
+ fflush(stdout);
+
+ for (int i = 0; i < prog_test_cnt; i++) {
+ struct prog_test_def *current_test;
+ struct test_result *result;
+
+ current_test = &prog_test_defs[i];
+ result = &test_results[i];
+
+ if (!current_test->tested)
+ continue;
+
+ env.succ_cnt += result->error_cnt ? 0 : 1;
+ env.skip_cnt += result->skip_cnt;
+ if (result->error_cnt)
+ env.fail_cnt++;
+ env.sub_succ_cnt += result->sub_succ_cnt;
+ }
+
+ fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
+ env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
+
+ print_all_error_logs();
+
+ /* reap all workers */
+ for (int i = 0; i < env.workers; i++) {
+ int wstatus, pid;
+
+ pid = waitpid(env.worker_pids[i], &wstatus, 0);
+ if (pid != env.worker_pids[i])
+ perror("Unable to reap worker");
+ }
+
+ return 0;
+}
+
+static int worker_main(int sock)
+{
+ save_netns();
+
+ while (true) {
+ /* receive command */
+ struct msg msg;
+
+ if (recv_message(sock, &msg) < 0)
+ goto out;
+
+ switch (msg.type) {
+ case MSG_EXIT:
+ if (env.debug)
+ fprintf(stderr, "[%d]: worker exit.\n",
+ env.worker_id);
+ goto out;
+ case MSG_DO_TEST: {
+ int test_to_run;
+ struct prog_test_def *test;
+ struct msg msg_done;
+
+ test_to_run = msg.do_test.test_num;
+ test = &prog_test_defs[test_to_run];
+
+ if (env.debug)
+ fprintf(stderr, "[%d]: #%d:%s running.\n",
+ env.worker_id,
+ test_to_run + 1,
+ test->test_name);
+
+ stdio_hijack();
+
+ run_one_test(test_to_run);
+
+ stdio_restore();
+
+ memset(&msg_done, 0, sizeof(msg_done));
+ msg_done.type = MSG_TEST_DONE;
+ msg_done.test_done.test_num = test_to_run;
+ msg_done.test_done.error_cnt = test->error_cnt;
+ msg_done.test_done.skip_cnt = test->skip_cnt;
+ msg_done.test_done.sub_succ_cnt = test->sub_succ_cnt;
+ msg_done.test_done.have_log = false;
+
+ if (env.verbosity > VERBOSE_NONE || test->force_log || test->error_cnt) {
+ if (env.log_cnt)
+ msg_done.test_done.have_log = true;
+ }
+ if (send_message(sock, &msg_done) < 0) {
+ perror("Fail to send message done");
+ goto out;
+ }
+
+ /* send logs */
+ if (msg_done.test_done.have_log) {
+ char *src;
+ size_t slen;
+
+ src = env.log_buf;
+ slen = env.log_cnt;
+ while (slen) {
+ struct msg msg_log;
+ char *dest;
+ size_t len;
+
+ memset(&msg_log, 0, sizeof(msg_log));
+ msg_log.type = MSG_TEST_LOG;
+ dest = msg_log.test_log.log_buf;
+ len = slen >= MAX_LOG_TRUNK_SIZE ? MAX_LOG_TRUNK_SIZE : slen;
+ memcpy(dest, src, len);
+
+ src += len;
+ slen -= len;
+ if (!slen)
+ msg_log.test_log.is_last = true;
+
+ assert(send_message(sock, &msg_log) >= 0);
+ }
+ }
+ if (env.log_buf) {
+ free(env.log_buf);
+ env.log_buf = NULL;
+ env.log_cnt = 0;
+ }
+ if (env.debug)
+ fprintf(stderr, "[%d]: #%d:%s done.\n",
+ env.worker_id,
+ test_to_run + 1,
+ test->test_name);
+ break;
+ } /* case MSG_DO_TEST */
+ default:
+ if (env.debug)
+ fprintf(stderr, "[%d]: unknown message.\n", env.worker_id);
+ return -1;
+ }
+ }
+out:
+ return 0;
+}
+
int main(int argc, char **argv)
{
static const struct argp argp = {
@@ -809,7 +1279,7 @@ int main(int argc, char **argv)
struct sigaction sigact = {
.sa_handler = crash_handler,
.sa_flags = SA_RESETHAND,
- };
+ };
int err, i;
sigaction(SIGSEGV, &sigact, NULL);
@@ -837,21 +1307,77 @@ int main(int argc, char **argv)
return -1;
}
- save_netns();
- stdio_hijack();
+ env.stdout = stdout;
+ env.stderr = stderr;
+
env.has_testmod = true;
if (!env.list_test_names && load_bpf_testmod()) {
fprintf(env.stderr, "WARNING! Selftests relying on bpf_testmod.ko will be skipped.\n");
env.has_testmod = false;
}
+
+ /* initializing tests */
for (i = 0; i < prog_test_cnt; i++) {
struct prog_test_def *test = &prog_test_defs[i];
- env.test = test;
test->test_num = i + 1;
-
- if (!should_run(&env.test_selector,
+ if (should_run(&env.test_selector,
test->test_num, test->test_name))
+ test->should_run = true;
+ else
+ test->should_run = false;
+ }
+
+ /* ignore workers if we are just listing */
+ if (env.get_test_cnt || env.list_test_names)
+ env.workers = 0;
+
+ /* launch workers if requested */
+ env.worker_id = -1; /* main process */
+ if (env.workers) {
+ env.worker_pids = calloc(sizeof(__pid_t), env.workers);
+ env.worker_socks = calloc(sizeof(int), env.workers);
+ if (env.debug)
+ fprintf(stdout, "Launching %d workers.\n", env.workers);
+ for (int i = 0; i < env.workers; i++) {
+ int sv[2];
+ pid_t pid;
+
+ if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, sv) < 0) {
+ perror("Fail to create worker socket");
+ return -1;
+ }
+ pid = fork();
+ if (pid < 0) {
+ perror("Failed to fork worker");
+ return -1;
+ } else if (pid != 0) { /* main process */
+ close(sv[1]);
+ env.worker_pids[i] = pid;
+ env.worker_socks[i] = sv[0];
+ } else { /* inside each worker process */
+ close(sv[0]);
+ env.worker_id = i;
+ return worker_main(sv[1]);
+ }
+ }
+
+ if (env.worker_id == -1) {
+ server_main();
+ goto out;
+ }
+ }
+
+ /* The rest of the main process */
+
+ /* on single mode */
+ save_netns();
+
+ for (i = 0; i < prog_test_cnt; i++) {
+ struct prog_test_def *test = &prog_test_defs[i];
+ struct test_result *result;
+
+ if (!test->should_run)
continue;
if (env.get_test_cnt) {
@@ -865,33 +1391,35 @@ int main(int argc, char **argv)
continue;
}
- test->run_test();
- /* ensure last sub-test is finalized properly */
- if (test->subtest_name)
- test__end_subtest();
+ stdio_hijack();
- test->tested = true;
+ run_one_test(i);
- dump_test_log(test, test->error_cnt);
+ stdio_restore();
fprintf(env.stdout, "#%d %s:%s\n",
test->test_num, test->test_name,
test->error_cnt ? "FAIL" : (test->skip_cnt ? "SKIP" : "OK"));
+ result = &test_results[i];
+ result->error_cnt = test->error_cnt;
+ if (env.log_buf) {
+ result->log_buf = strdup(env.log_buf);
+ result->log_cnt = env.log_cnt;
+
+ free(env.log_buf);
+ env.log_buf = NULL;
+ env.log_cnt = 0;
+ }
+
if (test->error_cnt)
env.fail_cnt++;
else
env.succ_cnt++;
- skip_account();
- reset_affinity();
- restore_netns();
- if (test->need_cgroup_cleanup)
- cleanup_cgroup_environment();
+ skip_account();
+ env.sub_succ_cnt += test->sub_succ_cnt;
}
- if (!env.list_test_names && env.has_testmod)
- unload_bpf_testmod();
- stdio_restore();
if (env.get_test_cnt) {
printf("%d\n", env.succ_cnt);
@@ -904,14 +1432,18 @@ int main(int argc, char **argv)
fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
+ print_all_error_logs();
+
+ close(env.saved_netns_fd);
out:
+ if (!env.list_test_names && env.has_testmod)
+ unload_bpf_testmod();
free_str_set(&env.test_selector.blacklist);
free_str_set(&env.test_selector.whitelist);
free(env.test_selector.num_set);
free_str_set(&env.subtest_selector.blacklist);
free_str_set(&env.subtest_selector.whitelist);
free(env.subtest_selector.num_set);
- close(env.saved_netns_fd);
if (env.succ_cnt + env.fail_cnt + env.skip_cnt == 0)
return EXIT_NO_TEST;
@@ -62,6 +62,7 @@ struct test_env {
struct test_selector test_selector;
struct test_selector subtest_selector;
bool verifier_stats;
+ bool debug;
enum verbosity verbosity;
bool jit_enabled;
@@ -69,7 +70,8 @@ struct test_env {
bool get_test_cnt;
bool list_test_names;
- struct prog_test_def *test;
+ struct prog_test_def *test; /* current running tests */
+
FILE *stdout;
FILE *stderr;
char *log_buf;
@@ -82,6 +84,38 @@ struct test_env {
int skip_cnt; /* skipped tests */
int saved_netns_fd;
+ int workers; /* number of worker process */
+ int worker_id; /* id number of current worker, main process is -1 */
+ pid_t *worker_pids; /* array of worker pids */
+ int *worker_socks; /* array of worker socks */
+ int *worker_current_test; /* array of current running test for each worker */
+};
+
+#define MAX_LOG_TRUNK_SIZE 8192
+enum msg_type {
+ MSG_DO_TEST = 0,
+ MSG_TEST_DONE = 1,
+ MSG_TEST_LOG = 2,
+ MSG_EXIT = 255,
+};
+struct msg {
+ enum msg_type type;
+ union {
+ struct {
+ int test_num;
+ } do_test;
+ struct {
+ int test_num;
+ int sub_succ_cnt;
+ int error_cnt;
+ int skip_cnt;
+ bool have_log;
+ } test_done;
+ struct {
+ char log_buf[MAX_LOG_TRUNK_SIZE + 1];
+ bool is_last;
+ } test_log;
+ };
};
extern struct test_env env;