@@ -1817,7 +1817,7 @@ static int fetch_multiple(struct string_list *list, int max_children)
result = run_processes_parallel_tr2(max_children,
&fetch_next_remote,
&fetch_failed_to_start,
- NULL,
+ NULL, NULL,
&fetch_finished,
&state,
"fetch", "parallel/fetch");
@@ -2295,7 +2295,7 @@ static int update_submodules(struct submodule_update_clone *suc)
int i;
run_processes_parallel_tr2(suc->max_jobs, update_clone_get_next_task,
- update_clone_start_failure, NULL,
+ update_clone_start_failure, NULL, NULL,
update_clone_task_finished, suc, "submodule",
"parallel/update");
@@ -192,6 +192,7 @@ int run_found_hooks(const char *hook_name, const char *hook_path,
pick_next_hook,
notify_start_failure,
options->feed_pipe,
+ NULL,
notify_hook_finished,
&cb_data,
"hook",
@@ -1494,6 +1494,7 @@ struct parallel_processes {
get_next_task_fn get_next_task;
start_failure_fn start_failure;
feed_pipe_fn feed_pipe;
+ consume_sideband_fn consume_sideband;
task_finished_fn task_finished;
struct {
@@ -1559,6 +1560,7 @@ static void pp_init(struct parallel_processes *pp,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
feed_pipe_fn feed_pipe,
+ consume_sideband_fn consume_sideband,
task_finished_fn task_finished,
void *data)
{
@@ -1579,6 +1581,7 @@ static void pp_init(struct parallel_processes *pp,
pp->start_failure = start_failure ? start_failure : default_start_failure;
pp->feed_pipe = feed_pipe ? feed_pipe : default_feed_pipe;
pp->task_finished = task_finished ? task_finished : default_task_finished;
+ pp->consume_sideband = consume_sideband;
pp->nr_processes = 0;
pp->output_owner = 0;
@@ -1615,7 +1618,10 @@ static void pp_cleanup(struct parallel_processes *pp)
* When get_next_task added messages to the buffer in its last
* iteration, the buffered output is non empty.
*/
- strbuf_write(&pp->buffered_output, stderr);
+ if (pp->consume_sideband)
+ pp->consume_sideband(&pp->buffered_output, pp->data);
+ else
+ strbuf_write(&pp->buffered_output, stderr);
strbuf_release(&pp->buffered_output);
sigchain_pop_common();
@@ -1736,9 +1742,13 @@ static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
static void pp_output(struct parallel_processes *pp)
{
int i = pp->output_owner;
+
if (pp->children[i].state == GIT_CP_WORKING &&
pp->children[i].err.len) {
- strbuf_write(&pp->children[i].err, stderr);
+ if (pp->consume_sideband)
+ pp->consume_sideband(&pp->children[i].err, pp->data);
+ else
+ strbuf_write(&pp->children[i].err, stderr);
strbuf_reset(&pp->children[i].err);
}
}
@@ -1777,11 +1787,15 @@ static int pp_collect_finished(struct parallel_processes *pp)
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
} else {
- strbuf_write(&pp->children[i].err, stderr);
+ /* Output errors, then all other finished child processes */
+ if (pp->consume_sideband) {
+ pp->consume_sideband(&pp->children[i].err, pp->data);
+ pp->consume_sideband(&pp->buffered_output, pp->data);
+ } else {
+ strbuf_write(&pp->children[i].err, stderr);
+ strbuf_write(&pp->buffered_output, stderr);
+ }
strbuf_reset(&pp->children[i].err);
-
- /* Output all other finished child processes */
- strbuf_write(&pp->buffered_output, stderr);
strbuf_reset(&pp->buffered_output);
/*
@@ -1805,6 +1819,7 @@ int run_processes_parallel(int n,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
feed_pipe_fn feed_pipe,
+ consume_sideband_fn consume_sideband,
task_finished_fn task_finished,
void *pp_cb)
{
@@ -1815,7 +1830,7 @@ int run_processes_parallel(int n,
sigchain_push(SIGPIPE, SIG_IGN);
- pp_init(&pp, n, get_next_task, start_failure, feed_pipe, task_finished, pp_cb);
+ pp_init(&pp, n, get_next_task, start_failure, feed_pipe, consume_sideband, task_finished, pp_cb);
while (1) {
for (i = 0;
i < spawn_cap && !pp.shutdown &&
@@ -1853,6 +1868,7 @@ int run_processes_parallel(int n,
int run_processes_parallel_tr2(int n, get_next_task_fn get_next_task,
start_failure_fn start_failure,
feed_pipe_fn feed_pipe,
+ consume_sideband_fn consume_sideband,
task_finished_fn task_finished, void *pp_cb,
const char *tr2_category, const char *tr2_label)
{
@@ -1862,7 +1878,8 @@ int run_processes_parallel_tr2(int n, get_next_task_fn get_next_task,
((n < 1) ? online_cpus() : n));
result = run_processes_parallel(n, get_next_task, start_failure,
- feed_pipe, task_finished, pp_cb);
+ feed_pipe, consume_sideband,
+ task_finished, pp_cb);
trace2_region_leave(tr2_category, tr2_label, NULL);
@@ -433,6 +433,20 @@ typedef int (*feed_pipe_fn)(struct strbuf *pipe,
void *pp_cb,
void *pp_task_cb);
+/**
+ * If this callback is provided, instead of collating process output to stderr,
+ * they will be collated into a new pipe. consume_sideband_fn will be called
+ * repeatedly. When output is available on that pipe, it will be contained in
+ * 'output'. But it will be called with an empty 'output' too, to allow for
+ * keepalives or similar operations if necessary.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel.
+ *
+ * Since this callback is provided with the collated output, no task cookie is
+ * provided.
+ */
+typedef void (*consume_sideband_fn)(struct strbuf *output, void *pp_cb);
+
/**
* This callback is called on every child process that finished processing.
*
@@ -468,10 +482,12 @@ int run_processes_parallel(int n,
get_next_task_fn,
start_failure_fn,
feed_pipe_fn,
+ consume_sideband_fn,
task_finished_fn,
void *pp_cb);
int run_processes_parallel_tr2(int n, get_next_task_fn, start_failure_fn,
- feed_pipe_fn, task_finished_fn, void *pp_cb,
+ feed_pipe_fn, consume_sideband_fn,
+ task_finished_fn, void *pp_cb,
const char *tr2_category, const char *tr2_label);
#endif
@@ -1645,7 +1645,7 @@ int fetch_populated_submodules(struct repository *r,
run_processes_parallel_tr2(max_parallel_jobs,
get_next_submodule,
fetch_start_failure,
- NULL,
+ NULL, NULL,
fetch_finish,
&spf,
"submodule", "parallel/fetch");
@@ -51,6 +51,16 @@ static int no_job(struct child_process *cp,
return 0;
}
+static void test_consume_sideband(struct strbuf *output, void *cb)
+{
+ FILE *sideband;
+
+ sideband = fopen("./sideband", "a");
+
+ strbuf_write(output, sideband);
+ fclose(sideband);
+}
+
static int task_finished(int result,
struct strbuf *err,
void *pp_cb,
@@ -201,7 +211,7 @@ static int testsuite(int argc, const char **argv)
suite.tests.nr, max_jobs);
ret = run_processes_parallel(max_jobs, next_test, test_failed,
- test_stdin, test_finished, &suite);
+ test_stdin, NULL, test_finished, &suite);
if (suite.failed.nr > 0) {
ret = 1;
@@ -429,23 +439,28 @@ int cmd__run_command(int argc, const char **argv)
if (!strcmp(argv[1], "run-command-parallel"))
exit(run_processes_parallel(jobs, parallel_next,
- NULL, NULL, NULL, &proc));
+ NULL, NULL, NULL, NULL, &proc));
if (!strcmp(argv[1], "run-command-abort"))
exit(run_processes_parallel(jobs, parallel_next,
- NULL, NULL, task_finished, &proc));
+ NULL, NULL, NULL, task_finished, &proc));
if (!strcmp(argv[1], "run-command-no-jobs"))
exit(run_processes_parallel(jobs, no_job,
- NULL, NULL, task_finished, &proc));
+ NULL, NULL, NULL, task_finished, &proc));
if (!strcmp(argv[1], "run-command-stdin")) {
proc.in = -1;
proc.no_stdin = 0;
exit (run_processes_parallel(jobs, parallel_next, NULL,
- test_stdin, NULL, &proc));
+ test_stdin, NULL, NULL, &proc));
}
+ if (!strcmp(argv[1], "run-command-sideband"))
+ exit(run_processes_parallel(jobs, parallel_next, NULL, NULL,
+ test_consume_sideband, NULL,
+ &proc));
+
fprintf(stderr, "check usage\n");
return 1;
}
@@ -143,6 +143,13 @@ test_expect_success 'run_command runs in parallel with more tasks than jobs avai
test_cmp expect actual
'
+test_expect_success 'run_command can divert output' '
+ test_when_finished rm sideband &&
+ test-tool run-command run-command-sideband 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+ test_must_be_empty actual &&
+ test_cmp expect sideband
+'
+
cat >expect <<-EOF
preloaded output of a child
listening for stdin: