Message ID | 20221020232532.1128326-2-calvinwan@google.com (mailing list archive) |
---|---|
State | Superseded |
Headers | show |
Series | submodule: parallelize diff | expand |
On Thu, Oct 20 2022, Calvin Wan wrote: > Add pipe_output_fn as an optionally set function in > run_process_parallel_opts. If set, output from each child process is > first separately stored in 'out' and then piped to the callback > function when the child process finishes to allow for separate parsing. The "when[...]finish[ed]" here seems a bit odd to me. Why isn't the API to just stream this to callbacks as it comes in. Then if a caller only cares about the output at the very end they can manage that state between their streaming callbacks and "finish" callback, i.e. buffer it & flush it themselves. > diff --git a/run-command.c b/run-command.c > index c772acd743..03787bc7f5 100644 > --- a/run-command.c > +++ b/run-command.c > @@ -1503,6 +1503,7 @@ struct parallel_processes { > enum child_state state; > struct child_process process; > struct strbuf err; > + struct strbuf out; > void *data; > } *children; > /* > @@ -1560,6 +1561,9 @@ static void pp_init(struct parallel_processes *pp, > > if (!opts->get_next_task) > BUG("you need to specify a get_next_task function"); > + > + if (opts->pipe_output && opts->ungroup) > + BUG("pipe_output and ungroup are incompatible with each other"); > > CALLOC_ARRAY(pp->children, n); > if (!opts->ungroup) > @@ -1567,6 +1571,8 @@ static void pp_init(struct parallel_processes *pp, > > for (size_t i = 0; i < n; i++) { > strbuf_init(&pp->children[i].err, 0); > + if (opts->pipe_output) > + strbuf_init(&pp->children[i].out, 0); Even if we're not using this, let's init it for simplicity. We don't use the "err" with ungroup and we're init-ing that, and... > child_process_init(&pp->children[i].process); > if (pp->pfd) { > pp->pfd[i].events = POLLIN | POLLHUP; > @@ -1586,6 +1592,7 @@ static void pp_cleanup(struct parallel_processes *pp, > trace_printf("run_processes_parallel: done"); > for (size_t i = 0; i < opts->processes; i++) { > strbuf_release(&pp->children[i].err); > + strbuf_release(&pp->children[i].out); ...here you're strbuf_relese()-ing a string that was never init'd, it's not segfaulting because we check sb->alloc, and since we calloc'd this whole thing it'll be 0, but let's just init it so it's a proper strbuf (with slopbuf). It's cheap. > +/** > + * This callback is called on every child process that finished processing. > + * > + * "struct strbuf *process_out" contains the output from the finished child > + * process. > + * > + * pp_cb is the callback cookie as passed into run_processes_parallel, > + * pp_task_cb is the callback cookie as passed into get_next_task_fn. > + * > + * This function is incompatible with "ungroup" > + */ > +typedef void (*pipe_output_fn)(struct strbuf *process_out, > + void *pp_cb, > + void *pp_task_cb); > + > /** > * This callback is called on every child process that finished processing. > * > @@ -493,6 +508,12 @@ struct run_process_parallel_opts > */ > start_failure_fn start_failure; > > + /** > + * pipe_output: See pipe_output_fn() above. This should be > + * NULL unless process specific output is needed > + */ > + pipe_output_fn pipe_output; > + > /** > * task_finished: See task_finished_fn() above. This can be > * NULL to omit any special handling. > diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c > index 3ecb830f4a..e9b41419a0 100644 > --- a/t/helper/test-run-command.c > +++ b/t/helper/test-run-command.c > @@ -52,6 +52,13 @@ static int no_job(struct child_process *cp, > return 0; > } > > +static void pipe_output(struct strbuf *process_out, > + void *pp_cb, > + void *pp_task_cb) > +{ > + fprintf(stderr, "%s", process_out->buf); maybe print this with split lines prefixed with something so wour tests can see that something actually happened here, & test-cmp it so we can see what went where, as opposed to... > +test_expect_success 'run_command runs in parallel with more jobs available than tasks --pipe-output' ' > + test-tool run-command --pipe-output run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && > + test_must_be_empty out && > + test_line_count = 20 err > +' Just checking the number of lines, which seems to leave a lot of leeway for the output being mixed up in all sorts of ways & the test to still pass.. (ditto below)
Calvin Wan <calvinwan@google.com> writes: > Add pipe_output_fn as an optionally set function in > run_process_parallel_opts. If set, output from each child process is > first separately stored in 'out' and then piped to the callback > function when the child process finishes to allow for separate parsing. In my review of one of the previous rounds, I asked which part of this functionality fits the name "pipe", and I do not think I got a satisfactory answer. And after re-reading the patch in this round, with the in-header comments, it still is not clear to me. It looks more like sending the duplicate of the normal output to a side channel, somewhat like the "tee" utility, but I am not sure if that is the intended way to be used.
> In my review of one of the previous rounds, I asked which part of > this functionality fits the name "pipe", and I do not think I got a > satisfactory answer. And after re-reading the patch in this round, > with the in-header comments, it still is not clear to me. > > It looks more like sending the duplicate of the normal output to a > side channel, somewhat like the "tee" utility, but I am not sure if > that is the intended way to be used. > In this case, I was hoping "pipe" would refer to the redirection of output from the child processes to a separate custom function, but I can see that duplication != redirection. Maybe something like "parse_child_output" or "parse_output" would make sense, however, I didn't want to imply with that name that the only functionality is to parse output. Besides that, I don't really have any other ideas of what I can name it...
On Thu, Oct 20, 2022 at 8:31 PM Ævar Arnfjörð Bjarmason <avarab@gmail.com> wrote: > > > On Thu, Oct 20 2022, Calvin Wan wrote: > > > Add pipe_output_fn as an optionally set function in > > run_process_parallel_opts. If set, output from each child process is > > first separately stored in 'out' and then piped to the callback > > function when the child process finishes to allow for separate parsing. > > The "when[...]finish[ed]" here seems a bit odd to me. Why isn't the API > to just stream this to callbacks as it comes in. > > Then if a caller only cares about the output at the very end they can > manage that state between their streaming callbacks and "finish" > callback, i.e. buffer it & flush it themselves. That's a good idea. This also lets me remove the 'out' variable from parallel_process.children. > > > diff --git a/run-command.c b/run-command.c > > index c772acd743..03787bc7f5 100644 > > --- a/run-command.c > > +++ b/run-command.c > > @@ -1503,6 +1503,7 @@ struct parallel_processes { > > enum child_state state; > > struct child_process process; > > struct strbuf err; > > + struct strbuf out; > > void *data; > > } *children; > > /* > > @@ -1560,6 +1561,9 @@ static void pp_init(struct parallel_processes *pp, > > > > if (!opts->get_next_task) > > BUG("you need to specify a get_next_task function"); > > + > > + if (opts->pipe_output && opts->ungroup) > > + BUG("pipe_output and ungroup are incompatible with each other"); > > > > CALLOC_ARRAY(pp->children, n); > > if (!opts->ungroup) > > @@ -1567,6 +1571,8 @@ static void pp_init(struct parallel_processes *pp, > > > > for (size_t i = 0; i < n; i++) { > > strbuf_init(&pp->children[i].err, 0); > > + if (opts->pipe_output) > > + strbuf_init(&pp->children[i].out, 0); > > Even if we're not using this, let's init it for simplicity. We don't use > the "err" with ungroup and we're init-ing that, and... ack. > > > child_process_init(&pp->children[i].process); > > if (pp->pfd) { > > pp->pfd[i].events = POLLIN | POLLHUP; > > @@ -1586,6 +1592,7 @@ static void pp_cleanup(struct parallel_processes *pp, > > trace_printf("run_processes_parallel: done"); > > for (size_t i = 0; i < opts->processes; i++) { > > strbuf_release(&pp->children[i].err); > > + strbuf_release(&pp->children[i].out); > > ...here you're strbuf_relese()-ing a string that was never init'd, it's > not segfaulting because we check sb->alloc, and since we calloc'd this > whole thing it'll be 0, but let's just init it so it's a proper strbuf > (with slopbuf). It's cheap. ack. > > +/** > > + * This callback is called on every child process that finished processing. > > + * > > + * "struct strbuf *process_out" contains the output from the finished child > > + * process. > > + * > > + * pp_cb is the callback cookie as passed into run_processes_parallel, > > + * pp_task_cb is the callback cookie as passed into get_next_task_fn. > > + * > > + * This function is incompatible with "ungroup" > > + */ > > +typedef void (*pipe_output_fn)(struct strbuf *process_out, > > + void *pp_cb, > > + void *pp_task_cb); > > + > > /** > > * This callback is called on every child process that finished processing. > > * > > @@ -493,6 +508,12 @@ struct run_process_parallel_opts > > */ > > start_failure_fn start_failure; > > > > + /** > > + * pipe_output: See pipe_output_fn() above. This should be > > + * NULL unless process specific output is needed > > + */ > > + pipe_output_fn pipe_output; > > + > > /** > > * task_finished: See task_finished_fn() above. This can be > > * NULL to omit any special handling. > > diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c > > index 3ecb830f4a..e9b41419a0 100644 > > --- a/t/helper/test-run-command.c > > +++ b/t/helper/test-run-command.c > > @@ -52,6 +52,13 @@ static int no_job(struct child_process *cp, > > return 0; > > } > > > > +static void pipe_output(struct strbuf *process_out, > > + void *pp_cb, > > + void *pp_task_cb) > > +{ > > + fprintf(stderr, "%s", process_out->buf); > > maybe print this with split lines prefixed with something so wour tests > can see that something actually happened here, & test-cmp it so we can > see what went where, as opposed to... > > > +test_expect_success 'run_command runs in parallel with more jobs available than tasks --pipe-output' ' > > + test-tool run-command --pipe-output run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && > > + test_must_be_empty out && > > + test_line_count = 20 err > > +' > > Just checking the number of lines, which seems to leave a lot of leeway > for the output being mixed up in all sorts of ways & the test to still > pass.. > > (ditto below) ack.
Calvin Wan <calvinwan@google.com> writes: >> In my review of one of the previous rounds, I asked which part of >> this functionality fits the name "pipe", and I do not think I got a >> satisfactory answer. And after re-reading the patch in this round, >> with the in-header comments, it still is not clear to me. >> >> It looks more like sending the duplicate of the normal output to a >> side channel, somewhat like the "tee" utility, but I am not sure if >> that is the intended way to be used. >> > > In this case, I was hoping "pipe" would refer to the redirection of > output from the child processes to a separate custom function, but > I can see that duplication != redirection. Maybe something like > "parse_child_output" or "parse_output" would make sense, however, > I didn't want to imply with that name that the only functionality is to > parse output. Besides that, I don't really have any other ideas of > what I can name it... Yeah, parsing is not to the point. Sending a copy of output to elsewhere is, so redirect is a better word than parse. And piping is not the only form of redirection, either. If duplication is really the point, then either giving it a name with a word that signals "duplication" would make more sense. "send_copy_fn"? I am not good at naming. As a name that is not end-user facing, it is tempting to assume that the readers have basic knowledge of Unix concepts and call it "tee_fn", but it would be way too cryptic to uninitiated, so I would not recommend it. Hmm...
On Mon, Oct 24, 2022 at 12:04 PM Junio C Hamano <gitster@pobox.com> wrote: > > Calvin Wan <calvinwan@google.com> writes: > > >> In my review of one of the previous rounds, I asked which part of > >> this functionality fits the name "pipe", and I do not think I got a > >> satisfactory answer. And after re-reading the patch in this round, > >> with the in-header comments, it still is not clear to me. > >> > >> It looks more like sending the duplicate of the normal output to a > >> side channel, somewhat like the "tee" utility, but I am not sure if > >> that is the intended way to be used. > >> > > > > In this case, I was hoping "pipe" would refer to the redirection of > > output from the child processes to a separate custom function, but > > I can see that duplication != redirection. Maybe something like > > "parse_child_output" or "parse_output" would make sense, however, > > I didn't want to imply with that name that the only functionality is to > > parse output. Besides that, I don't really have any other ideas of > > what I can name it... > > Yeah, parsing is not to the point. Sending a copy of output to > elsewhere is, so redirect is a better word than parse. And piping > is not the only form of redirection, either. If duplication is > really the point, then either giving it a name with a word that > signals "duplication" would make more sense. "send_copy_fn"? I am > not good at naming. > > As a name that is not end-user facing, it is tempting to assume that > the readers have basic knowledge of Unix concepts and call it > "tee_fn", but it would be way too cryptic to uninitiated, so I would > not recommend it. > > Hmm... Throwing some more ideas out there: split_duplicate_fn duplicate_output_fn dup_output_fn As you mention, it's not end-user facing so we should pick a name that's close enough (and any confusion can always be resolved by comments)
diff --git a/run-command.c b/run-command.c index c772acd743..03787bc7f5 100644 --- a/run-command.c +++ b/run-command.c @@ -1503,6 +1503,7 @@ struct parallel_processes { enum child_state state; struct child_process process; struct strbuf err; + struct strbuf out; void *data; } *children; /* @@ -1560,6 +1561,9 @@ static void pp_init(struct parallel_processes *pp, if (!opts->get_next_task) BUG("you need to specify a get_next_task function"); + + if (opts->pipe_output && opts->ungroup) + BUG("pipe_output and ungroup are incompatible with each other"); CALLOC_ARRAY(pp->children, n); if (!opts->ungroup) @@ -1567,6 +1571,8 @@ static void pp_init(struct parallel_processes *pp, for (size_t i = 0; i < n; i++) { strbuf_init(&pp->children[i].err, 0); + if (opts->pipe_output) + strbuf_init(&pp->children[i].out, 0); child_process_init(&pp->children[i].process); if (pp->pfd) { pp->pfd[i].events = POLLIN | POLLHUP; @@ -1586,6 +1592,7 @@ static void pp_cleanup(struct parallel_processes *pp, trace_printf("run_processes_parallel: done"); for (size_t i = 0; i < opts->processes; i++) { strbuf_release(&pp->children[i].err); + strbuf_release(&pp->children[i].out); child_process_clear(&pp->children[i].process); } @@ -1680,8 +1687,12 @@ static void pp_buffer_stderr(struct parallel_processes *pp, for (size_t i = 0; i < opts->processes; i++) { if (pp->children[i].state == GIT_CP_WORKING && pp->pfd[i].revents & (POLLIN | POLLHUP)) { - int n = strbuf_read_once(&pp->children[i].err, - pp->children[i].process.err, 0); + struct strbuf buf = STRBUF_INIT; + int n = strbuf_read_once(&buf, pp->children[i].process.err, 0); + strbuf_addbuf(&pp->children[i].err, &buf); + if (opts->pipe_output) + strbuf_addbuf(&pp->children[i].out, &buf); + strbuf_release(&buf); if (n == 0) { close(pp->children[i].process.err); pp->children[i].state = GIT_CP_WAIT_CLEANUP; @@ -1717,6 +1728,12 @@ static int pp_collect_finished(struct parallel_processes *pp, if (i == opts->processes) break; + if (opts->pipe_output) { + opts->pipe_output(&pp->children[i].out, opts->data, + pp->children[i].data); + strbuf_reset(&pp->children[i].out); + } + code = finish_command(&pp->children[i].process); if (opts->task_finished) diff --git a/run-command.h b/run-command.h index e3e1ea01ad..b4584c3698 100644 --- a/run-command.h +++ b/run-command.h @@ -440,6 +440,21 @@ typedef int (*start_failure_fn)(struct strbuf *out, void *pp_cb, void *pp_task_cb); +/** + * This callback is called on every child process that finished processing. + * + * "struct strbuf *process_out" contains the output from the finished child + * process. + * + * pp_cb is the callback cookie as passed into run_processes_parallel, + * pp_task_cb is the callback cookie as passed into get_next_task_fn. + * + * This function is incompatible with "ungroup" + */ +typedef void (*pipe_output_fn)(struct strbuf *process_out, + void *pp_cb, + void *pp_task_cb); + /** * This callback is called on every child process that finished processing. * @@ -493,6 +508,12 @@ struct run_process_parallel_opts */ start_failure_fn start_failure; + /** + * pipe_output: See pipe_output_fn() above. This should be + * NULL unless process specific output is needed + */ + pipe_output_fn pipe_output; + /** * task_finished: See task_finished_fn() above. This can be * NULL to omit any special handling. diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c index 3ecb830f4a..e9b41419a0 100644 --- a/t/helper/test-run-command.c +++ b/t/helper/test-run-command.c @@ -52,6 +52,13 @@ static int no_job(struct child_process *cp, return 0; } +static void pipe_output(struct strbuf *process_out, + void *pp_cb, + void *pp_task_cb) +{ + fprintf(stderr, "%s", process_out->buf); +} + static int task_finished(int result, struct strbuf *err, void *pp_cb, @@ -439,6 +446,12 @@ int cmd__run_command(int argc, const char **argv) opts.ungroup = 1; } + if (!strcmp(argv[1], "--pipe-output")) { + argv += 1; + argc -= 1; + opts.pipe_output = pipe_output; + } + jobs = atoi(argv[2]); strvec_clear(&proc.args); strvec_pushv(&proc.args, (const char **)argv + 3); diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh index 7b5423eebd..e50e57db89 100755 --- a/t/t0061-run-command.sh +++ b/t/t0061-run-command.sh @@ -134,6 +134,12 @@ test_expect_success 'run_command runs in parallel with more jobs available than test_cmp expect actual ' +test_expect_success 'run_command runs in parallel with more jobs available than tasks --pipe-output' ' + test-tool run-command --pipe-output run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && + test_must_be_empty out && + test_line_count = 20 err +' + test_expect_success 'run_command runs ungrouped in parallel with more jobs available than tasks' ' test-tool run-command --ungroup run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && test_line_count = 8 out && @@ -145,6 +151,12 @@ test_expect_success 'run_command runs in parallel with as many jobs as tasks' ' test_cmp expect actual ' +test_expect_success 'run_command runs in parallel with as many jobs as tasks --pipe-output' ' + test-tool run-command --pipe-output run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && + test_must_be_empty out && + test_line_count = 20 err +' + test_expect_success 'run_command runs ungrouped in parallel with as many jobs as tasks' ' test-tool run-command --ungroup run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && test_line_count = 8 out && @@ -156,6 +168,12 @@ test_expect_success 'run_command runs in parallel with more tasks than jobs avai test_cmp expect actual ' +test_expect_success 'run_command runs in parallel with more tasks than jobs available --pipe-output' ' + test-tool run-command --pipe-output run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && + test_must_be_empty out && + test_line_count = 20 err +' + test_expect_success 'run_command runs ungrouped in parallel with more tasks than jobs available' ' test-tool run-command --ungroup run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && test_line_count = 8 out && @@ -176,6 +194,12 @@ test_expect_success 'run_command is asked to abort gracefully' ' test_cmp expect actual ' +test_expect_success 'run_command is asked to abort gracefully --pipe-output' ' + test-tool run-command --pipe-output run-command-abort 3 false >out 2>err && + test_must_be_empty out && + test_cmp expect err +' + test_expect_success 'run_command is asked to abort gracefully (ungroup)' ' test-tool run-command --ungroup run-command-abort 3 false >out 2>err && test_must_be_empty out && @@ -191,6 +215,12 @@ test_expect_success 'run_command outputs ' ' test_cmp expect actual ' +test_expect_success 'run_command outputs --pipe-output' ' + test-tool run-command --pipe-output run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && + test_must_be_empty out && + test_cmp expect err +' + test_expect_success 'run_command outputs (ungroup) ' ' test-tool run-command --ungroup run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err && test_must_be_empty out &&
Add pipe_output_fn as an optionally set function in run_process_parallel_opts. If set, output from each child process is first separately stored in 'out' and then piped to the callback function when the child process finishes to allow for separate parsing. Two of the tests check for line count rather than an exact match since the interleaved output order is not guaranteed to be exactly the same every run through. Signed-off-by: Calvin Wan <calvinwan@google.com> --- run-command.c | 21 +++++++++++++++++++-- run-command.h | 21 +++++++++++++++++++++ t/helper/test-run-command.c | 13 +++++++++++++ t/t0061-run-command.sh | 30 ++++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 2 deletions(-)