Message ID | 20210913193906.2813357-3-fallentree@fb.com (mailing list archive) |
---|---|
State | Superseded |
Delegated to: | BPF |
Headers | show |
Series | [v4,bpf-next,1/3] selftests/bpf: Add parallelism to test_progs | expand |
On Mon, Sep 13, 2021 at 12:39 PM Yucong Sun <fallentree@fb.com> wrote: > > From: Yucong Sun <sunyucong@gmail.com> > > This patch adds a simple name list to pin some tests that fail to run in > parallel to worker 0. On encountering these tests, all other threads will wait > on a conditional variable, which worker 0 will signal once the tests has > finished running. > > Additionally, before running the test, thread 0 also check and wait until all > other threads has finished their work, to make sure the pinned test really are > the only test running in the system. > > After this change, all tests should pass in '-j' mode. > > Signed-off-by: Yucong Sun <sunyucong@gmail.com> > --- > tools/testing/selftests/bpf/test_progs.c | 109 ++++++++++++++++++++--- > 1 file changed, 97 insertions(+), 12 deletions(-) > > diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c > index f0eeb17c348d..dc72b3f526a6 100644 > --- a/tools/testing/selftests/bpf/test_progs.c > +++ b/tools/testing/selftests/bpf/test_progs.c > @@ -18,6 +18,16 @@ > #include <sys/socket.h> > #include <sys/un.h> > > +char *TESTS_MUST_SERIALIZE[] = { > + "netcnt", > + "select_reuseport", > + "sockmap_listen", > + "tc_redirect", > + "xdp_bonding", > + "xdp_info", > + NULL, > +}; > + I was actually thinking to encode this as part of the test function name itself. I.e., void test_vmlinux(void) for parallelizable tests and void serial_test_vmlinux(void) Then we can use weak symbols to "detect" which one is actually defined for any given test.: struct prog_test_def { void (*run_test)(void); void (*run_test_serial)(void); ... }; then test_progs.c will define #define DEFINE_TEST(name) extern void test_##name(void) __weak; extern void serial_test_##name(void) __weak; and that prog_test_def (though another DEFINE_TEST macro definition) will be initialized as { .test_name = #name, .run_test = &test_##name, .run_test_serial = &serial_test_##name, } After all that checking which of .run_test or .run_test_serial isn't NULL (and erroring out if both or neither) will determine whether the test is serial or parallel. Keeping this knowledge next to test itself is nice in that it will never get out of sync, will never be mismatched, etc (and it's very obvious when looking at the test file itself). Thoughts? [...]
On Tue, Sep 14, 2021 at 3:23 AM Andrii Nakryiko <andrii.nakryiko@gmail.com> wrote: > > On Mon, Sep 13, 2021 at 12:39 PM Yucong Sun <fallentree@fb.com> wrote: > > > > From: Yucong Sun <sunyucong@gmail.com> > > > > This patch adds a simple name list to pin some tests that fail to run in > > parallel to worker 0. On encountering these tests, all other threads will wait > > on a conditional variable, which worker 0 will signal once the tests has > > finished running. > > > > Additionally, before running the test, thread 0 also check and wait until all > > other threads has finished their work, to make sure the pinned test really are > > the only test running in the system. > > > > After this change, all tests should pass in '-j' mode. > > > > Signed-off-by: Yucong Sun <sunyucong@gmail.com> > > --- > > tools/testing/selftests/bpf/test_progs.c | 109 ++++++++++++++++++++--- > > 1 file changed, 97 insertions(+), 12 deletions(-) > > > > diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c > > index f0eeb17c348d..dc72b3f526a6 100644 > > --- a/tools/testing/selftests/bpf/test_progs.c > > +++ b/tools/testing/selftests/bpf/test_progs.c > > @@ -18,6 +18,16 @@ > > #include <sys/socket.h> > > #include <sys/un.h> > > > > +char *TESTS_MUST_SERIALIZE[] = { > > + "netcnt", > > + "select_reuseport", > > + "sockmap_listen", > > + "tc_redirect", > > + "xdp_bonding", > > + "xdp_info", > > + NULL, > > +}; > > + > > I was actually thinking to encode this as part of the test function > name itself. I.e., > > void test_vmlinux(void) for parallelizable tests > > and > > void serial_test_vmlinux(void) > > > Then we can use weak symbols to "detect" which one is actually defined > for any given test.: > > struct prog_test_def { > void (*run_test)(void); > void (*run_test_serial)(void); > ... > }; > > then test_progs.c will define > > #define DEFINE_TEST(name) extern void test_##name(void) __weak; extern > void serial_test_##name(void) __weak; > > and that prog_test_def (though another DEFINE_TEST macro definition) > will be initialized as > > { > .test_name = #name, > .run_test = &test_##name, > .run_test_serial = &serial_test_##name, > } > > > After all that checking which of .run_test or .run_test_serial isn't > NULL (and erroring out if both or neither) will determine whether the > test is serial or parallel. Keeping this knowledge next to test itself > is nice in that it will never get out of sync, will never be > mismatched, etc (and it's very obvious when looking at the test file > itself). > > Thoughts? Great idea! I ended up doing "serial_test_NAME()" and "run_serial_test()", but the idea is the same. > > > [...]
diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c index f0eeb17c348d..dc72b3f526a6 100644 --- a/tools/testing/selftests/bpf/test_progs.c +++ b/tools/testing/selftests/bpf/test_progs.c @@ -18,6 +18,16 @@ #include <sys/socket.h> #include <sys/un.h> +char *TESTS_MUST_SERIALIZE[] = { + "netcnt", + "select_reuseport", + "sockmap_listen", + "tc_redirect", + "xdp_bonding", + "xdp_info", + NULL, +}; + /* Adapted from perf/util/string.c */ static bool glob_match(const char *str, const char *pat) { @@ -821,6 +831,7 @@ void crash_handler(int signum) int current_test_idx = 0; pthread_mutex_t current_test_lock; +pthread_cond_t wait_for_worker0 = PTHREAD_COND_INITIALIZER; struct test_result { int error_cnt; @@ -887,6 +898,29 @@ struct dispatch_data { int sock; }; +static const char *get_test_name(int idx) +{ + struct prog_test_def *test; + + test = &prog_test_defs[idx]; + return test->test_name; +} + +bool is_test_must_serialize(int idx) +{ + struct prog_test_def *test; + char **p; + + test = &prog_test_defs[idx]; + p = &TESTS_MUST_SERIALIZE[0]; + while (*p != NULL) { + if (strcmp(*p, test->test_name) == 0) + return true; + p++; + } + return false; +} + void *dispatch_thread(void *_data) { struct dispatch_data *data; @@ -901,6 +935,8 @@ void *dispatch_thread(void *_data) struct prog_test_def *test; struct test_result *result; + env.worker_current_test[data->idx] = -1; + /* grab a test */ { pthread_mutex_lock(¤t_test_lock); @@ -909,8 +945,31 @@ void *dispatch_thread(void *_data) pthread_mutex_unlock(¤t_test_lock); goto done; } + test_to_run = current_test_idx; - current_test_idx++; + + if (is_test_must_serialize(current_test_idx)) { + if (data->idx != 0) { + fprintf(stderr, "[%d]: Waiting for thread 0 to finish serialized test: %d.\n", + data->idx, current_test_idx + 1); + /* wait for worker 0 to pick this job up and finish */ + pthread_cond_wait(&wait_for_worker0, ¤t_test_lock); + pthread_mutex_unlock(¤t_test_lock); + goto next; + } else { + /* wait until all other worker has parked */ + for (int i = 1; i < env.workers; i++) { + if (env.worker_current_test[i] != -1) { + fprintf(stderr, "[%d]: Waiting for other threads to finish current test...\n", data->idx); + pthread_mutex_unlock(¤t_test_lock); + usleep(1 * 1000 * 1000); + goto next; + } + } + } + } else { + current_test_idx++; + } pthread_mutex_unlock(¤t_test_lock); } @@ -975,7 +1034,15 @@ void *dispatch_thread(void *_data) fclose(log_fd); log_fd = NULL; } + } /* wait for test done */ + + /* unblock all other dispatcher threads */ + if (is_test_must_serialize(test_to_run) && data->idx == 0) { + current_test_idx++; + pthread_cond_broadcast(&wait_for_worker0); } +next: + continue; } /* while (true) */ error: fprintf(stderr, "[%d]: Protocol/IO error: %s", data->idx, strerror(errno)); @@ -997,16 +1064,19 @@ int server_main(void) { pthread_t *dispatcher_threads; struct dispatch_data *data; + int all_finished = false; dispatcher_threads = calloc(sizeof(pthread_t), env.workers); data = calloc(sizeof(struct dispatch_data), env.workers); env.worker_current_test = calloc(sizeof(int), env.workers); + for (int i = 0; i < env.workers; i++) { int rc; data[i].idx = i; data[i].sock = env.worker_socks[i]; + env.worker_current_test[i] = -1; rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]); if (rc < 0) { perror("Failed to launch dispatcher thread"); @@ -1015,17 +1085,27 @@ int server_main(void) } /* wait for all dispatcher to finish */ - for (int i = 0; i < env.workers; i++) { - while (true) { - struct timespec timeout = { - .tv_sec = time(NULL) + 5, - .tv_nsec = 0 - }; - if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT) - break; - fprintf(stderr, "Still waiting for thread %d (test %d).\n", i, env.worker_current_test[i] + 1); + while (!all_finished) { + all_finished = true; + for (int i = 0; i < env.workers; i++) { + if (!dispatcher_threads[i]) + continue; + + if (pthread_tryjoin_np(dispatcher_threads[i], NULL) == EBUSY) { + all_finished = false; + if (env.worker_current_test[i] == -1) + fprintf(stderr, "Still waiting for thread %d (blocked by thread 0).\n", i); + else + fprintf(stderr, "Still waiting for thread %d (test #%d:%s).\n", + i, env.worker_current_test[i] + 1, + get_test_name(env.worker_current_test[i])); + } else { + dispatcher_threads[i] = 0; + } } + usleep(10 * 1000 * 1000); } + free(dispatcher_threads); free(env.worker_current_test); free(data); @@ -1100,8 +1180,11 @@ int worker_main(int sock) test_to_run = msg.u.message_do_test.num; - fprintf(stderr, "[%d]: Running test %d.\n", - env.worker_index, test_to_run + 1); + if (env.verbosity > VERBOSE_NONE) + fprintf(stderr, "[%d]: #%d:%s running.\n", + env.worker_index, + test_to_run + 1, + get_test_name(test_to_run)); test = &prog_test_defs[test_to_run]; @@ -1172,6 +1255,8 @@ int worker_main(int sock) env.log_buf = NULL; env.log_cnt = 0; } + fprintf(stderr, "[%d]: #%d:%s done.\n", + env.worker_index, test_to_run + 1, get_test_name(test_to_run)); break; } /* case MSG_DO_TEST */ default: