diff mbox series

[v3,1/6] run-command: add pipe_output_fn to run_processes_parallel_opts

Message ID 20221020232532.1128326-2-calvinwan@google.com (mailing list archive)
State Superseded
Headers show
Series submodule: parallelize diff | expand

Commit Message

Calvin Wan Oct. 20, 2022, 11:25 p.m. UTC
Add pipe_output_fn as an optionally set function in
run_process_parallel_opts. If set, output from each child process is
first separately stored in 'out' and then piped to the callback
function when the child process finishes to allow for separate parsing.

Two of the tests check for line count rather than an exact match
since the interleaved output order is not guaranteed to be exactly
the same every run through.

Signed-off-by: Calvin Wan <calvinwan@google.com>
---
 run-command.c               | 21 +++++++++++++++++++--
 run-command.h               | 21 +++++++++++++++++++++
 t/helper/test-run-command.c | 13 +++++++++++++
 t/t0061-run-command.sh      | 30 ++++++++++++++++++++++++++++++
 4 files changed, 83 insertions(+), 2 deletions(-)

Comments

Ævar Arnfjörð Bjarmason Oct. 21, 2022, 3:11 a.m. UTC | #1
On Thu, Oct 20 2022, Calvin Wan wrote:

> Add pipe_output_fn as an optionally set function in
> run_process_parallel_opts. If set, output from each child process is
> first separately stored in 'out' and then piped to the callback
> function when the child process finishes to allow for separate parsing.

The "when[...]finish[ed]" here seems a bit odd to me. Why isn't the API
to just stream this to callbacks as it comes in.

Then if a caller only cares about the output at the very end they can
manage that state between their streaming callbacks and "finish"
callback, i.e. buffer it & flush it themselves.

> diff --git a/run-command.c b/run-command.c
> index c772acd743..03787bc7f5 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -1503,6 +1503,7 @@ struct parallel_processes {
>  		enum child_state state;
>  		struct child_process process;
>  		struct strbuf err;
> +		struct strbuf out;
>  		void *data;
>  	} *children;
>  	/*
> @@ -1560,6 +1561,9 @@ static void pp_init(struct parallel_processes *pp,
>  
>  	if (!opts->get_next_task)
>  		BUG("you need to specify a get_next_task function");
> +	
> +	if (opts->pipe_output && opts->ungroup)
> +		BUG("pipe_output and ungroup are incompatible with each other");
>  
>  	CALLOC_ARRAY(pp->children, n);
>  	if (!opts->ungroup)
> @@ -1567,6 +1571,8 @@ static void pp_init(struct parallel_processes *pp,
>  
>  	for (size_t i = 0; i < n; i++) {
>  		strbuf_init(&pp->children[i].err, 0);
> +		if (opts->pipe_output)
> +			strbuf_init(&pp->children[i].out, 0);

Even if we're not using this, let's init it for simplicity. We don't use
the "err" with ungroup and we're init-ing that, and...

>  		child_process_init(&pp->children[i].process);
>  		if (pp->pfd) {
>  			pp->pfd[i].events = POLLIN | POLLHUP;
> @@ -1586,6 +1592,7 @@ static void pp_cleanup(struct parallel_processes *pp,
>  	trace_printf("run_processes_parallel: done");
>  	for (size_t i = 0; i < opts->processes; i++) {
>  		strbuf_release(&pp->children[i].err);
> +		strbuf_release(&pp->children[i].out);

...here you're strbuf_relese()-ing a string that was never init'd, it's
not segfaulting because we check sb->alloc, and since we calloc'd this
whole thing it'll be 0, but let's just init it so it's a proper strbuf
(with slopbuf). It's cheap.

> +/**
> + * This callback is called on every child process that finished processing.
> + * 
> + * "struct strbuf *process_out" contains the output from the finished child
> + * process.
> + *
> + * pp_cb is the callback cookie as passed into run_processes_parallel,
> + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
> + *
> + * This function is incompatible with "ungroup"
> + */
> +typedef void (*pipe_output_fn)(struct strbuf *process_out,
> +			       void *pp_cb,
> +			       void *pp_task_cb);
> +
>  /**
>   * This callback is called on every child process that finished processing.
>   *
> @@ -493,6 +508,12 @@ struct run_process_parallel_opts
>  	 */
>  	start_failure_fn start_failure;
>  
> +	/**
> +	 * pipe_output: See pipe_output_fn() above. This should be
> +	 * NULL unless process specific output is needed
> +	 */
> +	pipe_output_fn pipe_output;
> +
>  	/**
>  	 * task_finished: See task_finished_fn() above. This can be
>  	 * NULL to omit any special handling.
> diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
> index 3ecb830f4a..e9b41419a0 100644
> --- a/t/helper/test-run-command.c
> +++ b/t/helper/test-run-command.c
> @@ -52,6 +52,13 @@ static int no_job(struct child_process *cp,
>  	return 0;
>  }
>  
> +static void pipe_output(struct strbuf *process_out,
> +			void *pp_cb,
> +			void *pp_task_cb)
> +{
> +	fprintf(stderr, "%s", process_out->buf);

maybe print this with split lines prefixed with something so wour tests
can see that something actually happened here, & test-cmp it so we can
see what went where, as opposed to...

> +test_expect_success 'run_command runs in parallel with more jobs available than tasks --pipe-output' '
> +	test-tool run-command --pipe-output run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
> +	test_must_be_empty out &&
> +	test_line_count = 20 err
> +'

Just checking the number of lines, which seems to leave a lot of leeway
for the output being mixed up in all sorts of ways & the test to still
pass..

(ditto below)
Junio C Hamano Oct. 21, 2022, 5:46 a.m. UTC | #2
Calvin Wan <calvinwan@google.com> writes:

> Add pipe_output_fn as an optionally set function in
> run_process_parallel_opts. If set, output from each child process is
> first separately stored in 'out' and then piped to the callback
> function when the child process finishes to allow for separate parsing.

In my review of one of the previous rounds, I asked which part of
this functionality fits the name "pipe", and I do not think I got a
satisfactory answer.  And after re-reading the patch in this round,
with the in-header comments, it still is not clear to me.

It looks more like sending the duplicate of the normal output to a
side channel, somewhat like the "tee" utility, but I am not sure if
that is the intended way to be used.
Calvin Wan Oct. 24, 2022, 5 p.m. UTC | #3
> In my review of one of the previous rounds, I asked which part of
> this functionality fits the name "pipe", and I do not think I got a
> satisfactory answer.  And after re-reading the patch in this round,
> with the in-header comments, it still is not clear to me.
>
> It looks more like sending the duplicate of the normal output to a
> side channel, somewhat like the "tee" utility, but I am not sure if
> that is the intended way to be used.
>

In this case, I was hoping "pipe" would refer to the redirection of
output from the child processes to a separate custom function, but
I can see that duplication != redirection. Maybe something like
"parse_child_output" or "parse_output" would make sense, however,
I didn't want to imply with that name that the only functionality is to
parse output. Besides that, I don't really have any other ideas of
what I can name it...
Calvin Wan Oct. 24, 2022, 5:13 p.m. UTC | #4
On Thu, Oct 20, 2022 at 8:31 PM Ævar Arnfjörð Bjarmason
<avarab@gmail.com> wrote:
>
>
> On Thu, Oct 20 2022, Calvin Wan wrote:
>
> > Add pipe_output_fn as an optionally set function in
> > run_process_parallel_opts. If set, output from each child process is
> > first separately stored in 'out' and then piped to the callback
> > function when the child process finishes to allow for separate parsing.
>
> The "when[...]finish[ed]" here seems a bit odd to me. Why isn't the API
> to just stream this to callbacks as it comes in.
>
> Then if a caller only cares about the output at the very end they can
> manage that state between their streaming callbacks and "finish"
> callback, i.e. buffer it & flush it themselves.

That's a good idea. This also lets me remove the 'out' variable from
parallel_process.children.

>
> > diff --git a/run-command.c b/run-command.c
> > index c772acd743..03787bc7f5 100644
> > --- a/run-command.c
> > +++ b/run-command.c
> > @@ -1503,6 +1503,7 @@ struct parallel_processes {
> >               enum child_state state;
> >               struct child_process process;
> >               struct strbuf err;
> > +             struct strbuf out;
> >               void *data;
> >       } *children;
> >       /*
> > @@ -1560,6 +1561,9 @@ static void pp_init(struct parallel_processes *pp,
> >
> >       if (!opts->get_next_task)
> >               BUG("you need to specify a get_next_task function");
> > +
> > +     if (opts->pipe_output && opts->ungroup)
> > +             BUG("pipe_output and ungroup are incompatible with each other");
> >
> >       CALLOC_ARRAY(pp->children, n);
> >       if (!opts->ungroup)
> > @@ -1567,6 +1571,8 @@ static void pp_init(struct parallel_processes *pp,
> >
> >       for (size_t i = 0; i < n; i++) {
> >               strbuf_init(&pp->children[i].err, 0);
> > +             if (opts->pipe_output)
> > +                     strbuf_init(&pp->children[i].out, 0);
>
> Even if we're not using this, let's init it for simplicity. We don't use
> the "err" with ungroup and we're init-ing that, and...

ack.

>
> >               child_process_init(&pp->children[i].process);
> >               if (pp->pfd) {
> >                       pp->pfd[i].events = POLLIN | POLLHUP;
> > @@ -1586,6 +1592,7 @@ static void pp_cleanup(struct parallel_processes *pp,
> >       trace_printf("run_processes_parallel: done");
> >       for (size_t i = 0; i < opts->processes; i++) {
> >               strbuf_release(&pp->children[i].err);
> > +             strbuf_release(&pp->children[i].out);
>
> ...here you're strbuf_relese()-ing a string that was never init'd, it's
> not segfaulting because we check sb->alloc, and since we calloc'd this
> whole thing it'll be 0, but let's just init it so it's a proper strbuf
> (with slopbuf). It's cheap.

ack.

> > +/**
> > + * This callback is called on every child process that finished processing.
> > + *
> > + * "struct strbuf *process_out" contains the output from the finished child
> > + * process.
> > + *
> > + * pp_cb is the callback cookie as passed into run_processes_parallel,
> > + * pp_task_cb is the callback cookie as passed into get_next_task_fn.
> > + *
> > + * This function is incompatible with "ungroup"
> > + */
> > +typedef void (*pipe_output_fn)(struct strbuf *process_out,
> > +                            void *pp_cb,
> > +                            void *pp_task_cb);
> > +
> >  /**
> >   * This callback is called on every child process that finished processing.
> >   *
> > @@ -493,6 +508,12 @@ struct run_process_parallel_opts
> >        */
> >       start_failure_fn start_failure;
> >
> > +     /**
> > +      * pipe_output: See pipe_output_fn() above. This should be
> > +      * NULL unless process specific output is needed
> > +      */
> > +     pipe_output_fn pipe_output;
> > +
> >       /**
> >        * task_finished: See task_finished_fn() above. This can be
> >        * NULL to omit any special handling.
> > diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
> > index 3ecb830f4a..e9b41419a0 100644
> > --- a/t/helper/test-run-command.c
> > +++ b/t/helper/test-run-command.c
> > @@ -52,6 +52,13 @@ static int no_job(struct child_process *cp,
> >       return 0;
> >  }
> >
> > +static void pipe_output(struct strbuf *process_out,
> > +                     void *pp_cb,
> > +                     void *pp_task_cb)
> > +{
> > +     fprintf(stderr, "%s", process_out->buf);
>
> maybe print this with split lines prefixed with something so wour tests
> can see that something actually happened here, & test-cmp it so we can
> see what went where, as opposed to...
>
> > +test_expect_success 'run_command runs in parallel with more jobs available than tasks --pipe-output' '
> > +     test-tool run-command --pipe-output run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
> > +     test_must_be_empty out &&
> > +     test_line_count = 20 err
> > +'
>
> Just checking the number of lines, which seems to leave a lot of leeway
> for the output being mixed up in all sorts of ways & the test to still
> pass..
>
> (ditto below)

ack.
Junio C Hamano Oct. 24, 2022, 7:04 p.m. UTC | #5
Calvin Wan <calvinwan@google.com> writes:

>> In my review of one of the previous rounds, I asked which part of
>> this functionality fits the name "pipe", and I do not think I got a
>> satisfactory answer.  And after re-reading the patch in this round,
>> with the in-header comments, it still is not clear to me.
>>
>> It looks more like sending the duplicate of the normal output to a
>> side channel, somewhat like the "tee" utility, but I am not sure if
>> that is the intended way to be used.
>>
>
> In this case, I was hoping "pipe" would refer to the redirection of
> output from the child processes to a separate custom function, but
> I can see that duplication != redirection. Maybe something like
> "parse_child_output" or "parse_output" would make sense, however,
> I didn't want to imply with that name that the only functionality is to
> parse output. Besides that, I don't really have any other ideas of
> what I can name it...

Yeah, parsing is not to the point.  Sending a copy of output to
elsewhere is, so redirect is a better word than parse.  And piping
is not the only form of redirection, either.  If duplication is
really the point, then either giving it a name with a word that
signals "duplication" would make more sense.  "send_copy_fn"?  I am
not good at naming.

As a name that is not end-user facing, it is tempting to assume that
the readers have basic knowledge of Unix concepts and call it
"tee_fn", but it would be way too cryptic to uninitiated, so I would
not recommend it.

Hmm...
Calvin Wan Oct. 25, 2022, 6:51 p.m. UTC | #6
On Mon, Oct 24, 2022 at 12:04 PM Junio C Hamano <gitster@pobox.com> wrote:
>
> Calvin Wan <calvinwan@google.com> writes:
>
> >> In my review of one of the previous rounds, I asked which part of
> >> this functionality fits the name "pipe", and I do not think I got a
> >> satisfactory answer.  And after re-reading the patch in this round,
> >> with the in-header comments, it still is not clear to me.
> >>
> >> It looks more like sending the duplicate of the normal output to a
> >> side channel, somewhat like the "tee" utility, but I am not sure if
> >> that is the intended way to be used.
> >>
> >
> > In this case, I was hoping "pipe" would refer to the redirection of
> > output from the child processes to a separate custom function, but
> > I can see that duplication != redirection. Maybe something like
> > "parse_child_output" or "parse_output" would make sense, however,
> > I didn't want to imply with that name that the only functionality is to
> > parse output. Besides that, I don't really have any other ideas of
> > what I can name it...
>
> Yeah, parsing is not to the point.  Sending a copy of output to
> elsewhere is, so redirect is a better word than parse.  And piping
> is not the only form of redirection, either.  If duplication is
> really the point, then either giving it a name with a word that
> signals "duplication" would make more sense.  "send_copy_fn"?  I am
> not good at naming.
>
> As a name that is not end-user facing, it is tempting to assume that
> the readers have basic knowledge of Unix concepts and call it
> "tee_fn", but it would be way too cryptic to uninitiated, so I would
> not recommend it.
>
> Hmm...

Throwing some more ideas out there:
split_duplicate_fn
duplicate_output_fn
dup_output_fn

As you mention, it's not end-user facing so we should pick a name
that's close enough (and any confusion can always be resolved by
comments)
diff mbox series

Patch

diff --git a/run-command.c b/run-command.c
index c772acd743..03787bc7f5 100644
--- a/run-command.c
+++ b/run-command.c
@@ -1503,6 +1503,7 @@  struct parallel_processes {
 		enum child_state state;
 		struct child_process process;
 		struct strbuf err;
+		struct strbuf out;
 		void *data;
 	} *children;
 	/*
@@ -1560,6 +1561,9 @@  static void pp_init(struct parallel_processes *pp,
 
 	if (!opts->get_next_task)
 		BUG("you need to specify a get_next_task function");
+	
+	if (opts->pipe_output && opts->ungroup)
+		BUG("pipe_output and ungroup are incompatible with each other");
 
 	CALLOC_ARRAY(pp->children, n);
 	if (!opts->ungroup)
@@ -1567,6 +1571,8 @@  static void pp_init(struct parallel_processes *pp,
 
 	for (size_t i = 0; i < n; i++) {
 		strbuf_init(&pp->children[i].err, 0);
+		if (opts->pipe_output)
+			strbuf_init(&pp->children[i].out, 0);
 		child_process_init(&pp->children[i].process);
 		if (pp->pfd) {
 			pp->pfd[i].events = POLLIN | POLLHUP;
@@ -1586,6 +1592,7 @@  static void pp_cleanup(struct parallel_processes *pp,
 	trace_printf("run_processes_parallel: done");
 	for (size_t i = 0; i < opts->processes; i++) {
 		strbuf_release(&pp->children[i].err);
+		strbuf_release(&pp->children[i].out);
 		child_process_clear(&pp->children[i].process);
 	}
 
@@ -1680,8 +1687,12 @@  static void pp_buffer_stderr(struct parallel_processes *pp,
 	for (size_t i = 0; i < opts->processes; i++) {
 		if (pp->children[i].state == GIT_CP_WORKING &&
 		    pp->pfd[i].revents & (POLLIN | POLLHUP)) {
-			int n = strbuf_read_once(&pp->children[i].err,
-						 pp->children[i].process.err, 0);
+			struct strbuf buf = STRBUF_INIT;
+			int n = strbuf_read_once(&buf, pp->children[i].process.err, 0);
+			strbuf_addbuf(&pp->children[i].err, &buf);
+			if (opts->pipe_output)
+				strbuf_addbuf(&pp->children[i].out, &buf);
+			strbuf_release(&buf);
 			if (n == 0) {
 				close(pp->children[i].process.err);
 				pp->children[i].state = GIT_CP_WAIT_CLEANUP;
@@ -1717,6 +1728,12 @@  static int pp_collect_finished(struct parallel_processes *pp,
 		if (i == opts->processes)
 			break;
 
+		if (opts->pipe_output) {
+			opts->pipe_output(&pp->children[i].out, opts->data,
+					  pp->children[i].data);
+			strbuf_reset(&pp->children[i].out);
+		}
+
 		code = finish_command(&pp->children[i].process);
 
 		if (opts->task_finished)
diff --git a/run-command.h b/run-command.h
index e3e1ea01ad..b4584c3698 100644
--- a/run-command.h
+++ b/run-command.h
@@ -440,6 +440,21 @@  typedef int (*start_failure_fn)(struct strbuf *out,
 				void *pp_cb,
 				void *pp_task_cb);
 
+/**
+ * This callback is called on every child process that finished processing.
+ * 
+ * "struct strbuf *process_out" contains the output from the finished child
+ * process.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * This function is incompatible with "ungroup"
+ */
+typedef void (*pipe_output_fn)(struct strbuf *process_out,
+			       void *pp_cb,
+			       void *pp_task_cb);
+
 /**
  * This callback is called on every child process that finished processing.
  *
@@ -493,6 +508,12 @@  struct run_process_parallel_opts
 	 */
 	start_failure_fn start_failure;
 
+	/**
+	 * pipe_output: See pipe_output_fn() above. This should be
+	 * NULL unless process specific output is needed
+	 */
+	pipe_output_fn pipe_output;
+
 	/**
 	 * task_finished: See task_finished_fn() above. This can be
 	 * NULL to omit any special handling.
diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
index 3ecb830f4a..e9b41419a0 100644
--- a/t/helper/test-run-command.c
+++ b/t/helper/test-run-command.c
@@ -52,6 +52,13 @@  static int no_job(struct child_process *cp,
 	return 0;
 }
 
+static void pipe_output(struct strbuf *process_out,
+			void *pp_cb,
+			void *pp_task_cb)
+{
+	fprintf(stderr, "%s", process_out->buf);
+}
+
 static int task_finished(int result,
 			 struct strbuf *err,
 			 void *pp_cb,
@@ -439,6 +446,12 @@  int cmd__run_command(int argc, const char **argv)
 		opts.ungroup = 1;
 	}
 
+	if (!strcmp(argv[1], "--pipe-output")) {
+		argv += 1;
+		argc -= 1;
+		opts.pipe_output = pipe_output;
+	}
+
 	jobs = atoi(argv[2]);
 	strvec_clear(&proc.args);
 	strvec_pushv(&proc.args, (const char **)argv + 3);
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 7b5423eebd..e50e57db89 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -134,6 +134,12 @@  test_expect_success 'run_command runs in parallel with more jobs available than
 	test_cmp expect actual
 '
 
+test_expect_success 'run_command runs in parallel with more jobs available than tasks --pipe-output' '
+	test-tool run-command --pipe-output run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_must_be_empty out &&
+	test_line_count = 20 err
+'
+
 test_expect_success 'run_command runs ungrouped in parallel with more jobs available than tasks' '
 	test-tool run-command --ungroup run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
 	test_line_count = 8 out &&
@@ -145,6 +151,12 @@  test_expect_success 'run_command runs in parallel with as many jobs as tasks' '
 	test_cmp expect actual
 '
 
+test_expect_success 'run_command runs in parallel with as many jobs as tasks --pipe-output' '
+	test-tool run-command --pipe-output run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_must_be_empty out &&
+	test_line_count = 20 err
+'
+
 test_expect_success 'run_command runs ungrouped in parallel with as many jobs as tasks' '
 	test-tool run-command --ungroup run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
 	test_line_count = 8 out &&
@@ -156,6 +168,12 @@  test_expect_success 'run_command runs in parallel with more tasks than jobs avai
 	test_cmp expect actual
 '
 
+test_expect_success 'run_command runs in parallel with more tasks than jobs available --pipe-output' '
+	test-tool run-command --pipe-output run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_must_be_empty out &&
+	test_line_count = 20 err
+'
+
 test_expect_success 'run_command runs ungrouped in parallel with more tasks than jobs available' '
 	test-tool run-command --ungroup run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
 	test_line_count = 8 out &&
@@ -176,6 +194,12 @@  test_expect_success 'run_command is asked to abort gracefully' '
 	test_cmp expect actual
 '
 
+test_expect_success 'run_command is asked to abort gracefully --pipe-output' '
+	test-tool run-command --pipe-output run-command-abort 3 false >out 2>err &&
+	test_must_be_empty out &&
+	test_cmp expect err
+'
+
 test_expect_success 'run_command is asked to abort gracefully (ungroup)' '
 	test-tool run-command --ungroup run-command-abort 3 false >out 2>err &&
 	test_must_be_empty out &&
@@ -191,6 +215,12 @@  test_expect_success 'run_command outputs ' '
 	test_cmp expect actual
 '
 
+test_expect_success 'run_command outputs --pipe-output' '
+	test-tool run-command --pipe-output run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_must_be_empty out &&
+	test_cmp expect err
+'
+
 test_expect_success 'run_command outputs (ungroup) ' '
 	test-tool run-command --ungroup run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
 	test_must_be_empty out &&