diff mbox series

[v2,5/8] run-command: add an "ungroup" option to run_process_parallel()

Message ID patch-v2-5.8-c2e015ed840-20220518T195858Z-avarab@gmail.com (mailing list archive)
State Superseded
Headers show
Series hook API: connect hooks to the TTY again, fixes a v2.36.0 regression | expand

Commit Message

Ævar Arnfjörð Bjarmason May 18, 2022, 8:05 p.m. UTC
Extend the parallel execution API added in c553c72eed6 (run-command:
add an asynchronous parallel child processor, 2015-12-15) to support a
mode where the stdout and stderr of the processes isn't captured and
output in a deterministic order, instead we'll leave it to the kernel
and stdio to sort it out.

This gives the API same functionality as GNU parallel's --ungroup
option. As we'll see in a subsequent commit the main reason to want
this is to support stdout and stderr being connected to the TTY in the
case of jobs=1, demonstrated here with GNU parallel:

	$ parallel --ungroup 'test -t {} && echo TTY || echo NTTY' ::: 1 2
	TTY
	TTY
	$ parallel 'test -t {} && echo TTY || echo NTTY' ::: 1 2
	NTTY
	NTTY

Another is as GNU parallel's documentation notes a potential for
optimization. Our results will be a bit different, but in cases where
you want to run processes in parallel where the exact order isn't
important this can be a lot faster:

	$ hyperfine -r 3 -L o ,--ungroup 'parallel {o} seq ::: 10000000 >/dev/null '
	Benchmark 1: parallel  seq ::: 10000000 >/dev/null
	  Time (mean ± σ):     220.2 ms ±   9.3 ms    [User: 124.9 ms, System: 96.1 ms]
	  Range (min … max):   212.3 ms … 230.5 ms    3 runs

	Benchmark 2: parallel --ungroup seq ::: 10000000 >/dev/null
	  Time (mean ± σ):     154.7 ms ±   0.9 ms    [User: 136.2 ms, System: 25.1 ms]
	  Range (min … max):   153.9 ms … 155.7 ms    3 runs

	Summary
	  'parallel --ungroup seq ::: 10000000 >/dev/null ' ran
	    1.42 ± 0.06 times faster than 'parallel  seq ::: 10000000 >/dev/null '

A large part of the juggling in the API is to make the API safer for
its maintenance and consumers alike.

For the maintenance of the API we e.g. avoid malloc()-ing the
"pp->pfd", ensuring that SANITIZE=address and other similar tools will
catch any unexpected misuse.

For API consumers we take pains to never pass the non-NULL "out"
buffer to an API user that provided the "ungroup" option. The
resulting code in t/helper/test-run-command.c isn't typical of such a
user, i.e. they'd typically use one mode or the other, and would know
whether they'd provided "ungroup" or not.

Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com>
---
 run-command.c               | 76 ++++++++++++++++++++++++++++---------
 run-command.h               | 31 +++++++++++----
 t/helper/test-run-command.c | 26 ++++++++++---
 t/t0061-run-command.sh      | 30 +++++++++++++++
 4 files changed, 132 insertions(+), 31 deletions(-)

Comments

Junio C Hamano May 18, 2022, 9:51 p.m. UTC | #1
Ævar Arnfjörð Bjarmason  <avarab@gmail.com> writes:

> + * If the "ungroup" option isn't specified the callbacks will get a
> + * pointer to a "struct strbuf *out", and must not write to stdout or
> + * stderr as such output will mess up the output of the other parallel
> + * processes. If "ungroup" option is specified callbacks will get a
> + * NULL "struct strbuf *out" parameter, and are responsible for
> + * emitting their own output, including dealing with any race
> + * conditions due to writing in parallel to stdout and stderr.
>   */
>  int run_processes_parallel(struct run_process_parallel_opts *opts);
>  
> diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
> index 56a806f228b..986acbce5f2 100644
> --- a/t/helper/test-run-command.c
> +++ b/t/helper/test-run-command.c
> @@ -31,7 +31,11 @@ static int parallel_next(struct child_process *cp,
>  		return 0;
>  
>  	strvec_pushv(&cp->args, d->args.v);
> -	strbuf_addstr(err, "preloaded output of a child\n");
> +	if (err)
> +		strbuf_addstr(err, "preloaded output of a child\n");
> +	else
> +		fprintf(stderr, "preloaded output of a child\n");
> +

This illustrates the intended use of !err and !!err pretty well ;-).
Emily Shaffer May 26, 2022, 5:18 p.m. UTC | #2
On Wed, May 18, 2022 at 10:05:21PM +0200, Ævar Arnfjörð Bjarmason wrote:
> 
> Extend the parallel execution API added in c553c72eed6 (run-command:
> add an asynchronous parallel child processor, 2015-12-15) to support a
> mode where the stdout and stderr of the processes isn't captured and
> output in a deterministic order, instead we'll leave it to the kernel
> and stdio to sort it out.
> 
> This gives the API same functionality as GNU parallel's --ungroup
> option. As we'll see in a subsequent commit the main reason to want
> this is to support stdout and stderr being connected to the TTY in the
> case of jobs=1, demonstrated here with GNU parallel:
> 
> 	$ parallel --ungroup 'test -t {} && echo TTY || echo NTTY' ::: 1 2
> 	TTY
> 	TTY
> 	$ parallel 'test -t {} && echo TTY || echo NTTY' ::: 1 2
> 	NTTY
> 	NTTY
> 
> Another is as GNU parallel's documentation notes a potential for
> optimization. Our results will be a bit different, but in cases where
> you want to run processes in parallel where the exact order isn't
> important this can be a lot faster:
> 
> 	$ hyperfine -r 3 -L o ,--ungroup 'parallel {o} seq ::: 10000000 >/dev/null '
> 	Benchmark 1: parallel  seq ::: 10000000 >/dev/null
> 	  Time (mean ± σ):     220.2 ms ±   9.3 ms    [User: 124.9 ms, System: 96.1 ms]
> 	  Range (min … max):   212.3 ms … 230.5 ms    3 runs
> 
> 	Benchmark 2: parallel --ungroup seq ::: 10000000 >/dev/null
> 	  Time (mean ± σ):     154.7 ms ±   0.9 ms    [User: 136.2 ms, System: 25.1 ms]
> 	  Range (min … max):   153.9 ms … 155.7 ms    3 runs
> 
> 	Summary
> 	  'parallel --ungroup seq ::: 10000000 >/dev/null ' ran
> 	    1.42 ± 0.06 times faster than 'parallel  seq ::: 10000000 >/dev/null '
> 
> A large part of the juggling in the API is to make the API safer for
> its maintenance and consumers alike.
> 
> For the maintenance of the API we e.g. avoid malloc()-ing the
> "pp->pfd", ensuring that SANITIZE=address and other similar tools will
> catch any unexpected misuse.
> 
> For API consumers we take pains to never pass the non-NULL "out"
> buffer to an API user that provided the "ungroup" option. The
> resulting code in t/helper/test-run-command.c isn't typical of such a
> user, i.e. they'd typically use one mode or the other, and would know
> whether they'd provided "ungroup" or not.

Ah, interesting, so it's a little finer grained than my suggestion of
"just always ungroup when jobs=1". Since it's an option directly
available to the caller, that means they could use it for something
where the ordering really doesn't matter as much as the rate, such as if
they wanted a bunch of subprocesses to split up some work in parallel or
something. I like the flexibility, even though I know before I said "why
don't we hide this functionality". So I was wrong, and this looks nice
to me :)

I think we actually could even automatically set ungroup if jobs=1 as
well, because then there is no reason to buffer the output - it uses
additional memory for us, and it makes output slower to see for the end
user. But I do not really mind enough to want a reroll.

> 
> Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com>
> ---
>  run-command.c               | 76 ++++++++++++++++++++++++++++---------
>  run-command.h               | 31 +++++++++++----
>  t/helper/test-run-command.c | 26 ++++++++++---
>  t/t0061-run-command.sh      | 30 +++++++++++++++
>  4 files changed, 132 insertions(+), 31 deletions(-)
> 
> diff --git a/run-command.c b/run-command.c
> index 839c85d12e5..39e09ee39fc 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -1468,7 +1468,7 @@ int pipe_command(struct child_process *cmd,
>  enum child_state {
>  	GIT_CP_FREE,
>  	GIT_CP_WORKING,
> -	GIT_CP_WAIT_CLEANUP,
> +	GIT_CP_WAIT_CLEANUP, /* only for !ungroup */
>  };
>  
>  struct parallel_processes {
> @@ -1494,6 +1494,7 @@ struct parallel_processes {
>  	struct pollfd *pfd;
>  
>  	unsigned shutdown : 1;
> +	unsigned ungroup : 1;
>  
>  	int output_owner;
>  	struct strbuf buffered_output; /* of finished children */
> @@ -1563,12 +1564,16 @@ static void pp_init(struct parallel_processes *pp,
>  	pp->nr_processes = 0;
>  	pp->output_owner = 0;
>  	pp->shutdown = 0;
> +	pp->ungroup = opts->ungroup;

I was worried about what happens if the caller changes the value of
run_processes_parallel_opt.ungroup in the middle of execution, but since
we're copying the value away during init, I think it will have no
effect. OK.

>  	CALLOC_ARRAY(pp->children, n);
> -	CALLOC_ARRAY(pp->pfd, n);
> +	if (!pp->ungroup)
> +		CALLOC_ARRAY(pp->pfd, n);
>  
>  	for (i = 0; i < n; i++) {
>  		strbuf_init(&pp->children[i].err, 0);
>  		child_process_init(&pp->children[i].process);
> +		if (!pp->pfd)
> +			continue;
>  		pp->pfd[i].events = POLLIN | POLLHUP;
>  		pp->pfd[i].fd = -1;
>  	}
> @@ -1594,7 +1599,8 @@ static void pp_cleanup(struct parallel_processes *pp)
>  	 * When get_next_task added messages to the buffer in its last
>  	 * iteration, the buffered output is non empty.
>  	 */
> -	strbuf_write(&pp->buffered_output, stderr);
> +	if (!pp->ungroup)
> +		strbuf_write(&pp->buffered_output, stderr);
>  	strbuf_release(&pp->buffered_output);
>  
>  	sigchain_pop_common();
> @@ -1609,6 +1615,7 @@ static void pp_cleanup(struct parallel_processes *pp)
>   */
>  static int pp_start_one(struct parallel_processes *pp)
>  {
> +	const int ungroup = pp->ungroup;
>  	int i, code;
>  
>  	for (i = 0; i < pp->max_processes; i++)
> @@ -1618,24 +1625,30 @@ static int pp_start_one(struct parallel_processes *pp)
>  		BUG("bookkeeping is hard");
>  
>  	code = pp->get_next_task(&pp->children[i].process,
> -				 &pp->children[i].err,
> +				 ungroup ? NULL : &pp->children[i].err,
>  				 pp->data,
>  				 &pp->children[i].data);
>  	if (!code) {
> -		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
> -		strbuf_reset(&pp->children[i].err);
> +		if (!ungroup) {
> +			strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
> +			strbuf_reset(&pp->children[i].err);
> +		}
>  		return 1;
>  	}
> -	pp->children[i].process.err = -1;
> -	pp->children[i].process.stdout_to_stderr = 1;
> +	if (!ungroup) {
> +		pp->children[i].process.err = -1;
> +		pp->children[i].process.stdout_to_stderr = 1;
> +	}

Hm, so for now if ungroup=1, we ignore the stdout entirely? It looks
like in patch 8 we're relying on the get_next_task callback to set these
instead, right? Or am I misunderstanding it, and the child's stderr goes
to our stderr by default?

>  	pp->children[i].process.no_stdin = 1;
>  
>  	if (start_command(&pp->children[i].process)) {
> -		code = pp->start_failure(&pp->children[i].err,
> +		code = pp->start_failure(ungroup ? NULL : &pp->children[i].err,
>  					 pp->data,
>  					 pp->children[i].data);
> -		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
> -		strbuf_reset(&pp->children[i].err);
> +		if (!ungroup) {
> +			strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
> +			strbuf_reset(&pp->children[i].err);
> +		}
>  		if (code)
>  			pp->shutdown = 1;
>  		return code;
> @@ -1643,14 +1656,26 @@ static int pp_start_one(struct parallel_processes *pp)
>  
>  	pp->nr_processes++;
>  	pp->children[i].state = GIT_CP_WORKING;
> -	pp->pfd[i].fd = pp->children[i].process.err;
> +	if (pp->pfd)
> +		pp->pfd[i].fd = pp->children[i].process.err;
>  	return 0;
>  }
>  
> +static void pp_mark_working_for_cleanup(struct parallel_processes *pp)
> +{
> +	int i;
> +
> +	for (i = 0; i < pp->max_processes; i++)
> +		if (pp->children[i].state == GIT_CP_WORKING)
> +			pp->children[i].state = GIT_CP_WAIT_CLEANUP;
> +}
> +
>  static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
>  {
>  	int i;
>  
> +	assert(!pp->ungroup);
> +
>  	while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
>  		if (errno == EINTR)
>  			continue;
> @@ -1677,6 +1702,9 @@ static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
>  static void pp_output(struct parallel_processes *pp)
>  {
>  	int i = pp->output_owner;
> +
> +	assert(!pp->ungroup);
> +
>  	if (pp->children[i].state == GIT_CP_WORKING &&
>  	    pp->children[i].err.len) {
>  		strbuf_write(&pp->children[i].err, stderr);
> @@ -1686,10 +1714,15 @@ static void pp_output(struct parallel_processes *pp)
>  
>  static int pp_collect_finished(struct parallel_processes *pp)
>  {
> +	const int ungroup = pp->ungroup;
>  	int i, code;
>  	int n = pp->max_processes;
>  	int result = 0;
>  
> +	if (ungroup)
> +		for (i = 0; i < pp->max_processes; i++)
> +			pp->children[i].state = GIT_CP_WAIT_CLEANUP;
> +
>  	while (pp->nr_processes > 0) {
>  		for (i = 0; i < pp->max_processes; i++)
>  			if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
> @@ -1700,8 +1733,8 @@ static int pp_collect_finished(struct parallel_processes *pp)
>  		code = finish_command(&pp->children[i].process);
>  
>  		code = pp->task_finished(code,
> -					 &pp->children[i].err, pp->data,
> -					 pp->children[i].data);
> +					 ungroup ? NULL : &pp->children[i].err,
> +					 pp->data, pp->children[i].data);
>  
>  		if (code)
>  			result = code;
> @@ -1710,10 +1743,13 @@ static int pp_collect_finished(struct parallel_processes *pp)
>  
>  		pp->nr_processes--;
>  		pp->children[i].state = GIT_CP_FREE;
> -		pp->pfd[i].fd = -1;
> +		if (pp->pfd)
> +			pp->pfd[i].fd = -1;
>  		child_process_init(&pp->children[i].process);
>  
> -		if (i != pp->output_owner) {
> +		if (ungroup) {
> +			; /* no strbuf_*() work to do here */
> +		} else if (i != pp->output_owner) {
>  			strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
>  			strbuf_reset(&pp->children[i].err);
>  		} else {
> @@ -1774,8 +1810,12 @@ int run_processes_parallel(struct run_process_parallel_opts *opts)
>  		}
>  		if (!pp.nr_processes)
>  			break;
> -		pp_buffer_stderr(&pp, output_timeout);
> -		pp_output(&pp);
> +		if (opts->ungroup) {
> +			pp_mark_working_for_cleanup(&pp);
> +		} else {
> +			pp_buffer_stderr(&pp, output_timeout);
> +			pp_output(&pp);
> +		}
>  		code = pp_collect_finished(&pp);
>  		if (code) {
>  			pp.shutdown = 1;
> diff --git a/run-command.h b/run-command.h
> index b0268ed3db1..dcb6ded4b55 100644
> --- a/run-command.h
> +++ b/run-command.h
> @@ -405,6 +405,10 @@ void check_pipe(int err);
>   * pp_cb is the callback cookie as passed to run_processes_parallel.
>   * You can store a child process specific callback cookie in pp_task_cb.
>   *
> + * The "struct strbuf *err" parameter is either a pointer to a string
> + * to write errors to, or NULL if the "ungroup" option was
> + * provided. See run_processes_parallel() below.
> + *
>   * Even after returning 0 to indicate that there are no more processes,
>   * this function will be called again until there are no more running
>   * child processes.
> @@ -423,9 +427,9 @@ typedef int (*get_next_task_fn)(struct child_process *cp,
>   * This callback is called whenever there are problems starting
>   * a new process.
>   *
> - * You must not write to stdout or stderr in this function. Add your
> - * message to the strbuf out instead, which will be printed without
> - * messing up the output of the other parallel processes.
> + * The "struct strbuf *err" parameter is either a pointer to a string
> + * to write errors to, or NULL if the "ungroup" option was
> + * provided. See run_processes_parallel() below.

I think we are losing some useful information here ("do not write
directly to stdout or stderr from here"). Same for below.

>   *
>   * pp_cb is the callback cookie as passed into run_processes_parallel,
>   * pp_task_cb is the callback cookie as passed into get_next_task_fn.
> @@ -441,9 +445,9 @@ typedef int (*start_failure_fn)(struct strbuf *out,
>  /**
>   * This callback is called on every child process that finished processing.
>   *
> - * You must not write to stdout or stderr in this function. Add your
> - * message to the strbuf out instead, which will be printed without
> - * messing up the output of the other parallel processes.
> + * The "struct strbuf *err" parameter is either a pointer to a string
> + * to write errors to, or NULL if the "ungroup" option was
> + * provided. See run_processes_parallel() below.
>   *
>   * pp_cb is the callback cookie as passed into run_processes_parallel,
>   * pp_task_cb is the callback cookie as passed into get_next_task_fn.
> @@ -466,6 +470,10 @@ typedef int (*task_finished_fn)(int result,
>   *
>   * jobs: see 'n' in run_processes_parallel() below.
>   *
> + * ungroup: Ungroup output. Output is printed as soon as possible and
> + * bypasses run-command's internal processing. This may cause output
> + * from different commands to be mixed.
> + *
>   * *_fn & data: see run_processes_parallel() below.
>   */
>  struct run_process_parallel_opts
> @@ -474,6 +482,7 @@ struct run_process_parallel_opts
>  	const char *tr2_label;
>  
>  	int jobs;
> +	unsigned int ungroup:1;
>  
>  	get_next_task_fn get_next_task;
>  	start_failure_fn start_failure;
> @@ -490,10 +499,18 @@ struct run_process_parallel_opts
>   *
>   * The children started via this function run in parallel. Their output
>   * (both stdout and stderr) is routed to stderr in a manner that output
> - * from different tasks does not interleave.
> + * from different tasks does not interleave (but see "ungroup" above).

Hm, I think it would be more accurate to say, "By default, their output
(blah blah)..." because in fact when we set ungroup, it does interleave.
But this is a small nit, from reading "ungroup" it becomes clear.

>   *
>   * start_failure_fn and task_finished_fn can be NULL to omit any
>   * special handling.
> + *
> + * If the "ungroup" option isn't specified the callbacks will get a
> + * pointer to a "struct strbuf *out", and must not write to stdout or
> + * stderr as such output will mess up the output of the other parallel
> + * processes. If "ungroup" option is specified callbacks will get a
> + * NULL "struct strbuf *out" parameter, and are responsible for
> + * emitting their own output, including dealing with any race
> + * conditions due to writing in parallel to stdout and stderr.
>   */
>  int run_processes_parallel(struct run_process_parallel_opts *opts);
>  
> diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
> index 56a806f228b..986acbce5f2 100644
> --- a/t/helper/test-run-command.c
> +++ b/t/helper/test-run-command.c
> @@ -31,7 +31,11 @@ static int parallel_next(struct child_process *cp,
>  		return 0;
>  
>  	strvec_pushv(&cp->args, d->args.v);
> -	strbuf_addstr(err, "preloaded output of a child\n");
> +	if (err)
> +		strbuf_addstr(err, "preloaded output of a child\n");
> +	else
> +		fprintf(stderr, "preloaded output of a child\n");
> +
>  	number_callbacks++;
>  	return 1;
>  }
> @@ -41,7 +45,10 @@ static int no_job(struct child_process *cp,
>  		  void *cb,
>  		  void **task_cb)
>  {
> -	strbuf_addstr(err, "no further jobs available\n");
> +	if (err)
> +		strbuf_addstr(err, "no further jobs available\n");
> +	else
> +		fprintf(stderr, "no further jobs available\n");
>  	return 0;
>  }
>  
> @@ -50,7 +57,10 @@ static int task_finished(int result,
>  			 void *pp_cb,
>  			 void *pp_task_cb)
>  {
> -	strbuf_addstr(err, "asking for a quick stop\n");
> +	if (err)
> +		strbuf_addstr(err, "asking for a quick stop\n");
> +	else
> +		fprintf(stderr, "asking for a quick stop\n");
>  	return 1;
>  }
>  
> @@ -422,12 +432,15 @@ int cmd__run_command(int argc, const char **argv)
>  	opts.jobs = jobs;
>  	opts.data = &proc;
>  
> -	if (!strcmp(argv[1], "run-command-parallel")) {
> +	if (!strcmp(argv[1], "run-command-parallel") ||
> +	    !strcmp(argv[1], "run-command-parallel-ungroup")) {
>  		next_fn = parallel_next;
> -	} else if (!strcmp(argv[1], "run-command-abort")) {
> +	} else if (!strcmp(argv[1], "run-command-abort") ||
> +		   !strcmp(argv[1], "run-command-abort-ungroup")) {
>  		next_fn = parallel_next;
>  		finished_fn = task_finished;
> -	} else if (!strcmp(argv[1], "run-command-no-jobs")) {
> +	} else if (!strcmp(argv[1], "run-command-no-jobs") ||
> +		   !strcmp(argv[1], "run-command-no-jobs-ungroup")) {
>  		next_fn = no_job;
>  		finished_fn = task_finished;
>  	} else {
> @@ -435,6 +448,7 @@ int cmd__run_command(int argc, const char **argv)
>  		return 1;
>  	}
>  
> +	opts.ungroup = ends_with(argv[1], "-ungroup");
>  	opts.get_next_task = next_fn;
>  	opts.task_finished = finished_fn;
>  	exit(run_processes_parallel(&opts));
> diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
> index 7d00f3cc2af..3628719a06d 100755
> --- a/t/t0061-run-command.sh
> +++ b/t/t0061-run-command.sh
> @@ -135,18 +135,36 @@ test_expect_success 'run_command runs in parallel with more jobs available than
>  	test_cmp expect err
>  '
>  
> +test_expect_success 'run_command runs ungrouped in parallel with more jobs available than tasks' '
> +	test-tool run-command run-command-parallel-ungroup 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
> +	test_line_count = 8 out &&
> +	test_line_count = 4 err
> +'
> +
>  test_expect_success 'run_command runs in parallel with as many jobs as tasks' '
>  	test-tool run-command run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
>  	test_must_be_empty out &&
>  	test_cmp expect err
>  '
>  
> +test_expect_success 'run_command runs ungrouped in parallel with as many jobs as tasks' '
> +	test-tool run-command run-command-parallel-ungroup 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
> +	test_line_count = 8 out &&
> +	test_line_count = 4 err
> +'
> +
>  test_expect_success 'run_command runs in parallel with more tasks than jobs available' '
>  	test-tool run-command run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
>  	test_must_be_empty out &&
>  	test_cmp expect err
>  '
>  
> +test_expect_success 'run_command runs ungrouped in parallel with more tasks than jobs available' '
> +	test-tool run-command run-command-parallel-ungroup 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
> +	test_line_count = 8 out &&
> +	test_line_count = 4 err
> +'
> +
>  cat >expect <<-EOF
>  preloaded output of a child
>  asking for a quick stop
> @@ -162,6 +180,12 @@ test_expect_success 'run_command is asked to abort gracefully' '
>  	test_cmp expect err
>  '
>  
> +test_expect_success 'run_command is asked to abort gracefully (ungroup)' '
> +	test-tool run-command run-command-abort-ungroup 3 false >out 2>err &&
> +	test_must_be_empty out &&
> +	test_line_count = 6 err
> +'
> +
>  cat >expect <<-EOF
>  no further jobs available
>  EOF
> @@ -172,6 +196,12 @@ test_expect_success 'run_command outputs ' '
>  	test_cmp expect err
>  '
>  
> +test_expect_success 'run_command outputs (ungroup) ' '
> +	test-tool run-command run-command-no-jobs-ungroup 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
> +	test_must_be_empty out &&
> +	test_cmp expect err
> +'
> +
>  test_trace () {
>  	expect="$1"
>  	shift
> -- 
> 2.36.1.952.g0ae626f6cd7
>
Junio C Hamano May 27, 2022, 4:08 p.m. UTC | #3
Emily Shaffer <emilyshaffer@google.com> writes:

> I think we actually could even automatically set ungroup if jobs=1 as
> well, because then there is no reason to buffer the output - it uses
> additional memory for us, and it makes output slower to see for the end
> user. But I do not really mind enough to want a reroll.

Not doing so would protect us from future end-user complaints,
similar to the way that made us consider the change in 2.36 to be a
regression.  Those who are used to see their stuff run in submodules
(which I recall was the original purpose of run_processes_parallel
was invented for) with their standard output and error streams not
directly connected to the original end-user terminal will start seeing
the expectation broken but only when there is only one submodule, no?

Doing it when an explicit "ungroup" was called for would hopefully
avoid such an inconsistent behaviour.
diff mbox series

Patch

diff --git a/run-command.c b/run-command.c
index 839c85d12e5..39e09ee39fc 100644
--- a/run-command.c
+++ b/run-command.c
@@ -1468,7 +1468,7 @@  int pipe_command(struct child_process *cmd,
 enum child_state {
 	GIT_CP_FREE,
 	GIT_CP_WORKING,
-	GIT_CP_WAIT_CLEANUP,
+	GIT_CP_WAIT_CLEANUP, /* only for !ungroup */
 };
 
 struct parallel_processes {
@@ -1494,6 +1494,7 @@  struct parallel_processes {
 	struct pollfd *pfd;
 
 	unsigned shutdown : 1;
+	unsigned ungroup : 1;
 
 	int output_owner;
 	struct strbuf buffered_output; /* of finished children */
@@ -1563,12 +1564,16 @@  static void pp_init(struct parallel_processes *pp,
 	pp->nr_processes = 0;
 	pp->output_owner = 0;
 	pp->shutdown = 0;
+	pp->ungroup = opts->ungroup;
 	CALLOC_ARRAY(pp->children, n);
-	CALLOC_ARRAY(pp->pfd, n);
+	if (!pp->ungroup)
+		CALLOC_ARRAY(pp->pfd, n);
 
 	for (i = 0; i < n; i++) {
 		strbuf_init(&pp->children[i].err, 0);
 		child_process_init(&pp->children[i].process);
+		if (!pp->pfd)
+			continue;
 		pp->pfd[i].events = POLLIN | POLLHUP;
 		pp->pfd[i].fd = -1;
 	}
@@ -1594,7 +1599,8 @@  static void pp_cleanup(struct parallel_processes *pp)
 	 * When get_next_task added messages to the buffer in its last
 	 * iteration, the buffered output is non empty.
 	 */
-	strbuf_write(&pp->buffered_output, stderr);
+	if (!pp->ungroup)
+		strbuf_write(&pp->buffered_output, stderr);
 	strbuf_release(&pp->buffered_output);
 
 	sigchain_pop_common();
@@ -1609,6 +1615,7 @@  static void pp_cleanup(struct parallel_processes *pp)
  */
 static int pp_start_one(struct parallel_processes *pp)
 {
+	const int ungroup = pp->ungroup;
 	int i, code;
 
 	for (i = 0; i < pp->max_processes; i++)
@@ -1618,24 +1625,30 @@  static int pp_start_one(struct parallel_processes *pp)
 		BUG("bookkeeping is hard");
 
 	code = pp->get_next_task(&pp->children[i].process,
-				 &pp->children[i].err,
+				 ungroup ? NULL : &pp->children[i].err,
 				 pp->data,
 				 &pp->children[i].data);
 	if (!code) {
-		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
-		strbuf_reset(&pp->children[i].err);
+		if (!ungroup) {
+			strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+			strbuf_reset(&pp->children[i].err);
+		}
 		return 1;
 	}
-	pp->children[i].process.err = -1;
-	pp->children[i].process.stdout_to_stderr = 1;
+	if (!ungroup) {
+		pp->children[i].process.err = -1;
+		pp->children[i].process.stdout_to_stderr = 1;
+	}
 	pp->children[i].process.no_stdin = 1;
 
 	if (start_command(&pp->children[i].process)) {
-		code = pp->start_failure(&pp->children[i].err,
+		code = pp->start_failure(ungroup ? NULL : &pp->children[i].err,
 					 pp->data,
 					 pp->children[i].data);
-		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
-		strbuf_reset(&pp->children[i].err);
+		if (!ungroup) {
+			strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+			strbuf_reset(&pp->children[i].err);
+		}
 		if (code)
 			pp->shutdown = 1;
 		return code;
@@ -1643,14 +1656,26 @@  static int pp_start_one(struct parallel_processes *pp)
 
 	pp->nr_processes++;
 	pp->children[i].state = GIT_CP_WORKING;
-	pp->pfd[i].fd = pp->children[i].process.err;
+	if (pp->pfd)
+		pp->pfd[i].fd = pp->children[i].process.err;
 	return 0;
 }
 
+static void pp_mark_working_for_cleanup(struct parallel_processes *pp)
+{
+	int i;
+
+	for (i = 0; i < pp->max_processes; i++)
+		if (pp->children[i].state == GIT_CP_WORKING)
+			pp->children[i].state = GIT_CP_WAIT_CLEANUP;
+}
+
 static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 {
 	int i;
 
+	assert(!pp->ungroup);
+
 	while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
 		if (errno == EINTR)
 			continue;
@@ -1677,6 +1702,9 @@  static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 static void pp_output(struct parallel_processes *pp)
 {
 	int i = pp->output_owner;
+
+	assert(!pp->ungroup);
+
 	if (pp->children[i].state == GIT_CP_WORKING &&
 	    pp->children[i].err.len) {
 		strbuf_write(&pp->children[i].err, stderr);
@@ -1686,10 +1714,15 @@  static void pp_output(struct parallel_processes *pp)
 
 static int pp_collect_finished(struct parallel_processes *pp)
 {
+	const int ungroup = pp->ungroup;
 	int i, code;
 	int n = pp->max_processes;
 	int result = 0;
 
+	if (ungroup)
+		for (i = 0; i < pp->max_processes; i++)
+			pp->children[i].state = GIT_CP_WAIT_CLEANUP;
+
 	while (pp->nr_processes > 0) {
 		for (i = 0; i < pp->max_processes; i++)
 			if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
@@ -1700,8 +1733,8 @@  static int pp_collect_finished(struct parallel_processes *pp)
 		code = finish_command(&pp->children[i].process);
 
 		code = pp->task_finished(code,
-					 &pp->children[i].err, pp->data,
-					 pp->children[i].data);
+					 ungroup ? NULL : &pp->children[i].err,
+					 pp->data, pp->children[i].data);
 
 		if (code)
 			result = code;
@@ -1710,10 +1743,13 @@  static int pp_collect_finished(struct parallel_processes *pp)
 
 		pp->nr_processes--;
 		pp->children[i].state = GIT_CP_FREE;
-		pp->pfd[i].fd = -1;
+		if (pp->pfd)
+			pp->pfd[i].fd = -1;
 		child_process_init(&pp->children[i].process);
 
-		if (i != pp->output_owner) {
+		if (ungroup) {
+			; /* no strbuf_*() work to do here */
+		} else if (i != pp->output_owner) {
 			strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 			strbuf_reset(&pp->children[i].err);
 		} else {
@@ -1774,8 +1810,12 @@  int run_processes_parallel(struct run_process_parallel_opts *opts)
 		}
 		if (!pp.nr_processes)
 			break;
-		pp_buffer_stderr(&pp, output_timeout);
-		pp_output(&pp);
+		if (opts->ungroup) {
+			pp_mark_working_for_cleanup(&pp);
+		} else {
+			pp_buffer_stderr(&pp, output_timeout);
+			pp_output(&pp);
+		}
 		code = pp_collect_finished(&pp);
 		if (code) {
 			pp.shutdown = 1;
diff --git a/run-command.h b/run-command.h
index b0268ed3db1..dcb6ded4b55 100644
--- a/run-command.h
+++ b/run-command.h
@@ -405,6 +405,10 @@  void check_pipe(int err);
  * pp_cb is the callback cookie as passed to run_processes_parallel.
  * You can store a child process specific callback cookie in pp_task_cb.
  *
+ * The "struct strbuf *err" parameter is either a pointer to a string
+ * to write errors to, or NULL if the "ungroup" option was
+ * provided. See run_processes_parallel() below.
+ *
  * Even after returning 0 to indicate that there are no more processes,
  * this function will be called again until there are no more running
  * child processes.
@@ -423,9 +427,9 @@  typedef int (*get_next_task_fn)(struct child_process *cp,
  * This callback is called whenever there are problems starting
  * a new process.
  *
- * You must not write to stdout or stderr in this function. Add your
- * message to the strbuf out instead, which will be printed without
- * messing up the output of the other parallel processes.
+ * The "struct strbuf *err" parameter is either a pointer to a string
+ * to write errors to, or NULL if the "ungroup" option was
+ * provided. See run_processes_parallel() below.
  *
  * pp_cb is the callback cookie as passed into run_processes_parallel,
  * pp_task_cb is the callback cookie as passed into get_next_task_fn.
@@ -441,9 +445,9 @@  typedef int (*start_failure_fn)(struct strbuf *out,
 /**
  * This callback is called on every child process that finished processing.
  *
- * You must not write to stdout or stderr in this function. Add your
- * message to the strbuf out instead, which will be printed without
- * messing up the output of the other parallel processes.
+ * The "struct strbuf *err" parameter is either a pointer to a string
+ * to write errors to, or NULL if the "ungroup" option was
+ * provided. See run_processes_parallel() below.
  *
  * pp_cb is the callback cookie as passed into run_processes_parallel,
  * pp_task_cb is the callback cookie as passed into get_next_task_fn.
@@ -466,6 +470,10 @@  typedef int (*task_finished_fn)(int result,
  *
  * jobs: see 'n' in run_processes_parallel() below.
  *
+ * ungroup: Ungroup output. Output is printed as soon as possible and
+ * bypasses run-command's internal processing. This may cause output
+ * from different commands to be mixed.
+ *
  * *_fn & data: see run_processes_parallel() below.
  */
 struct run_process_parallel_opts
@@ -474,6 +482,7 @@  struct run_process_parallel_opts
 	const char *tr2_label;
 
 	int jobs;
+	unsigned int ungroup:1;
 
 	get_next_task_fn get_next_task;
 	start_failure_fn start_failure;
@@ -490,10 +499,18 @@  struct run_process_parallel_opts
  *
  * The children started via this function run in parallel. Their output
  * (both stdout and stderr) is routed to stderr in a manner that output
- * from different tasks does not interleave.
+ * from different tasks does not interleave (but see "ungroup" above).
  *
  * start_failure_fn and task_finished_fn can be NULL to omit any
  * special handling.
+ *
+ * If the "ungroup" option isn't specified the callbacks will get a
+ * pointer to a "struct strbuf *out", and must not write to stdout or
+ * stderr as such output will mess up the output of the other parallel
+ * processes. If "ungroup" option is specified callbacks will get a
+ * NULL "struct strbuf *out" parameter, and are responsible for
+ * emitting their own output, including dealing with any race
+ * conditions due to writing in parallel to stdout and stderr.
  */
 int run_processes_parallel(struct run_process_parallel_opts *opts);
 
diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
index 56a806f228b..986acbce5f2 100644
--- a/t/helper/test-run-command.c
+++ b/t/helper/test-run-command.c
@@ -31,7 +31,11 @@  static int parallel_next(struct child_process *cp,
 		return 0;
 
 	strvec_pushv(&cp->args, d->args.v);
-	strbuf_addstr(err, "preloaded output of a child\n");
+	if (err)
+		strbuf_addstr(err, "preloaded output of a child\n");
+	else
+		fprintf(stderr, "preloaded output of a child\n");
+
 	number_callbacks++;
 	return 1;
 }
@@ -41,7 +45,10 @@  static int no_job(struct child_process *cp,
 		  void *cb,
 		  void **task_cb)
 {
-	strbuf_addstr(err, "no further jobs available\n");
+	if (err)
+		strbuf_addstr(err, "no further jobs available\n");
+	else
+		fprintf(stderr, "no further jobs available\n");
 	return 0;
 }
 
@@ -50,7 +57,10 @@  static int task_finished(int result,
 			 void *pp_cb,
 			 void *pp_task_cb)
 {
-	strbuf_addstr(err, "asking for a quick stop\n");
+	if (err)
+		strbuf_addstr(err, "asking for a quick stop\n");
+	else
+		fprintf(stderr, "asking for a quick stop\n");
 	return 1;
 }
 
@@ -422,12 +432,15 @@  int cmd__run_command(int argc, const char **argv)
 	opts.jobs = jobs;
 	opts.data = &proc;
 
-	if (!strcmp(argv[1], "run-command-parallel")) {
+	if (!strcmp(argv[1], "run-command-parallel") ||
+	    !strcmp(argv[1], "run-command-parallel-ungroup")) {
 		next_fn = parallel_next;
-	} else if (!strcmp(argv[1], "run-command-abort")) {
+	} else if (!strcmp(argv[1], "run-command-abort") ||
+		   !strcmp(argv[1], "run-command-abort-ungroup")) {
 		next_fn = parallel_next;
 		finished_fn = task_finished;
-	} else if (!strcmp(argv[1], "run-command-no-jobs")) {
+	} else if (!strcmp(argv[1], "run-command-no-jobs") ||
+		   !strcmp(argv[1], "run-command-no-jobs-ungroup")) {
 		next_fn = no_job;
 		finished_fn = task_finished;
 	} else {
@@ -435,6 +448,7 @@  int cmd__run_command(int argc, const char **argv)
 		return 1;
 	}
 
+	opts.ungroup = ends_with(argv[1], "-ungroup");
 	opts.get_next_task = next_fn;
 	opts.task_finished = finished_fn;
 	exit(run_processes_parallel(&opts));
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 7d00f3cc2af..3628719a06d 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -135,18 +135,36 @@  test_expect_success 'run_command runs in parallel with more jobs available than
 	test_cmp expect err
 '
 
+test_expect_success 'run_command runs ungrouped in parallel with more jobs available than tasks' '
+	test-tool run-command run-command-parallel-ungroup 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_line_count = 8 out &&
+	test_line_count = 4 err
+'
+
 test_expect_success 'run_command runs in parallel with as many jobs as tasks' '
 	test-tool run-command run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
 	test_must_be_empty out &&
 	test_cmp expect err
 '
 
+test_expect_success 'run_command runs ungrouped in parallel with as many jobs as tasks' '
+	test-tool run-command run-command-parallel-ungroup 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_line_count = 8 out &&
+	test_line_count = 4 err
+'
+
 test_expect_success 'run_command runs in parallel with more tasks than jobs available' '
 	test-tool run-command run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
 	test_must_be_empty out &&
 	test_cmp expect err
 '
 
+test_expect_success 'run_command runs ungrouped in parallel with more tasks than jobs available' '
+	test-tool run-command run-command-parallel-ungroup 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_line_count = 8 out &&
+	test_line_count = 4 err
+'
+
 cat >expect <<-EOF
 preloaded output of a child
 asking for a quick stop
@@ -162,6 +180,12 @@  test_expect_success 'run_command is asked to abort gracefully' '
 	test_cmp expect err
 '
 
+test_expect_success 'run_command is asked to abort gracefully (ungroup)' '
+	test-tool run-command run-command-abort-ungroup 3 false >out 2>err &&
+	test_must_be_empty out &&
+	test_line_count = 6 err
+'
+
 cat >expect <<-EOF
 no further jobs available
 EOF
@@ -172,6 +196,12 @@  test_expect_success 'run_command outputs ' '
 	test_cmp expect err
 '
 
+test_expect_success 'run_command outputs (ungroup) ' '
+	test-tool run-command run-command-no-jobs-ungroup 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
+	test_must_be_empty out &&
+	test_cmp expect err
+'
+
 test_trace () {
 	expect="$1"
 	shift