@@ -1819,6 +1819,7 @@ static int fetch_multiple(struct string_list *list, int max_children)
result = run_processes_parallel_tr2(max_children,
&fetch_next_remote,
&fetch_failed_to_start,
+ NULL,
&fetch_finished,
&state,
"fetch", "parallel/fetch");
@@ -2311,7 +2311,7 @@ static int update_submodules(struct submodule_update_clone *suc)
int i;
run_processes_parallel_tr2(suc->max_jobs, update_clone_get_next_task,
- update_clone_start_failure,
+ update_clone_start_failure, NULL,
update_clone_task_finished, suc, "submodule",
"parallel/update");
@@ -146,6 +146,7 @@ int run_hooks(const char *hook_name, const char *hook_path,
run_processes_parallel_tr2(jobs,
pick_next_hook,
notify_start_failure,
+ NULL,
notify_hook_finished,
&cb_data,
"hook",
@@ -1492,6 +1492,7 @@ struct parallel_processes {
get_next_task_fn get_next_task;
start_failure_fn start_failure;
+ feed_pipe_fn feed_pipe;
task_finished_fn task_finished;
struct {
@@ -1519,6 +1520,13 @@ static int default_start_failure(struct strbuf *out,
return 0;
}
+static int default_feed_pipe(struct strbuf *pipe,
+ void *pp_cb,
+ void *pp_task_cb)
+{
+ return 1;
+}
+
static int default_task_finished(int result,
struct strbuf *out,
void *pp_cb,
@@ -1549,6 +1557,7 @@ static void pp_init(struct parallel_processes *pp,
int n,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
+ feed_pipe_fn feed_pipe,
task_finished_fn task_finished,
void *data)
{
@@ -1567,6 +1576,7 @@ static void pp_init(struct parallel_processes *pp,
pp->get_next_task = get_next_task;
pp->start_failure = start_failure ? start_failure : default_start_failure;
+ pp->feed_pipe = feed_pipe ? feed_pipe : default_feed_pipe;
pp->task_finished = task_finished ? task_finished : default_task_finished;
pp->nr_processes = 0;
@@ -1664,6 +1674,37 @@ static int pp_start_one(struct parallel_processes *pp)
return 0;
}
+static void pp_buffer_stdin(struct parallel_processes *pp)
+{
+ int i;
+ struct strbuf sb = STRBUF_INIT;
+
+ /* Buffer stdin for each pipe. */
+ for (i = 0; i < pp->max_processes; i++) {
+ if (pp->children[i].state == GIT_CP_WORKING &&
+ pp->children[i].process.in > 0) {
+ int done;
+ strbuf_reset(&sb);
+ done = pp->feed_pipe(&sb, pp->data,
+ pp->children[i].data);
+ if (sb.len) {
+ if (write_in_full(pp->children[i].process.in,
+ sb.buf, sb.len) < 0) {
+ if (errno != EPIPE)
+ die_errno("write");
+ done = 1;
+ }
+ }
+ if (done) {
+ close(pp->children[i].process.in);
+ pp->children[i].process.in = 0;
+ }
+ }
+ }
+
+ strbuf_release(&sb);
+}
+
static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
{
int i;
@@ -1728,6 +1769,7 @@ static int pp_collect_finished(struct parallel_processes *pp)
pp->nr_processes--;
pp->children[i].state = GIT_CP_FREE;
pp->pfd[i].fd = -1;
+ pp->children[i].process.in = 0;
child_process_init(&pp->children[i].process);
if (i != pp->output_owner) {
@@ -1761,6 +1803,7 @@ static int pp_collect_finished(struct parallel_processes *pp)
int run_processes_parallel(int n,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
+ feed_pipe_fn feed_pipe,
task_finished_fn task_finished,
void *pp_cb)
{
@@ -1769,7 +1812,9 @@ int run_processes_parallel(int n,
int spawn_cap = 4;
struct parallel_processes pp;
- pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
+ sigchain_push(SIGPIPE, SIG_IGN);
+
+ pp_init(&pp, n, get_next_task, start_failure, feed_pipe, task_finished, pp_cb);
while (1) {
for (i = 0;
i < spawn_cap && !pp.shutdown &&
@@ -1786,6 +1831,7 @@ int run_processes_parallel(int n,
}
if (!pp.nr_processes)
break;
+ pp_buffer_stdin(&pp);
pp_buffer_stderr(&pp, output_timeout);
pp_output(&pp);
code = pp_collect_finished(&pp);
@@ -1797,11 +1843,15 @@ int run_processes_parallel(int n,
}
pp_cleanup(&pp);
+
+ sigchain_pop(SIGPIPE);
+
return 0;
}
int run_processes_parallel_tr2(int n, get_next_task_fn get_next_task,
start_failure_fn start_failure,
+ feed_pipe_fn feed_pipe,
task_finished_fn task_finished, void *pp_cb,
const char *tr2_category, const char *tr2_label)
{
@@ -1811,7 +1861,7 @@ int run_processes_parallel_tr2(int n, get_next_task_fn get_next_task,
((n < 1) ? online_cpus() : n));
result = run_processes_parallel(n, get_next_task, start_failure,
- task_finished, pp_cb);
+ feed_pipe, task_finished, pp_cb);
trace2_region_leave(tr2_category, tr2_label, NULL);
@@ -422,6 +422,20 @@ typedef int (*start_failure_fn)(struct strbuf *out,
void *pp_cb,
void *pp_task_cb);
+/**
+ * This callback is called repeatedly on every child process who requests
+ * start_command() to create a pipe by setting child_process.in < 0.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel, and
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ * The contents of 'send' will be read into the pipe and passed to the pipe.
+ *
+ * Return nonzero to close the pipe.
+ */
+typedef int (*feed_pipe_fn)(struct strbuf *pipe,
+ void *pp_cb,
+ void *pp_task_cb);
+
/**
* This callback is called on every child process that finished processing.
*
@@ -456,10 +470,11 @@ typedef int (*task_finished_fn)(int result,
int run_processes_parallel(int n,
get_next_task_fn,
start_failure_fn,
+ feed_pipe_fn,
task_finished_fn,
void *pp_cb);
int run_processes_parallel_tr2(int n, get_next_task_fn, start_failure_fn,
- task_finished_fn, void *pp_cb,
+ feed_pipe_fn, task_finished_fn, void *pp_cb,
const char *tr2_category, const char *tr2_label);
/**
@@ -1632,6 +1632,7 @@ int fetch_populated_submodules(struct repository *r,
run_processes_parallel_tr2(max_parallel_jobs,
get_next_submodule,
fetch_start_failure,
+ NULL,
fetch_finish,
&spf,
"submodule", "parallel/fetch");
@@ -32,8 +32,13 @@ static int parallel_next(struct child_process *cp,
return 0;
strvec_pushv(&cp->args, d->argv);
+ cp->in = d->in;
+ cp->no_stdin = d->no_stdin;
strbuf_addstr(err, "preloaded output of a child\n");
number_callbacks++;
+
+ *task_cb = xmalloc(sizeof(int));
+ *(int*)(*task_cb) = 2;
return 1;
}
@@ -55,6 +60,17 @@ static int task_finished(int result,
return 1;
}
+static int test_stdin(struct strbuf *pipe, void *cb, void *task_cb)
+{
+ int *lines_remaining = task_cb;
+
+ if (*lines_remaining)
+ strbuf_addf(pipe, "sample stdin %d\n", --(*lines_remaining));
+
+ return !(*lines_remaining);
+}
+
+
struct testsuite {
struct string_list tests, failed;
int next;
@@ -185,7 +201,7 @@ static int testsuite(int argc, const char **argv)
suite.tests.nr, max_jobs);
ret = run_processes_parallel(max_jobs, next_test, test_failed,
- test_finished, &suite);
+ test_stdin, test_finished, &suite);
if (suite.failed.nr > 0) {
ret = 1;
@@ -413,15 +429,22 @@ int cmd__run_command(int argc, const char **argv)
if (!strcmp(argv[1], "run-command-parallel"))
exit(run_processes_parallel(jobs, parallel_next,
- NULL, NULL, &proc));
+ NULL, NULL, NULL, &proc));
if (!strcmp(argv[1], "run-command-abort"))
exit(run_processes_parallel(jobs, parallel_next,
- NULL, task_finished, &proc));
+ NULL, NULL, task_finished, &proc));
if (!strcmp(argv[1], "run-command-no-jobs"))
exit(run_processes_parallel(jobs, no_job,
- NULL, task_finished, &proc));
+ NULL, NULL, task_finished, &proc));
+
+ if (!strcmp(argv[1], "run-command-stdin")) {
+ proc.in = -1;
+ proc.no_stdin = 0;
+ exit (run_processes_parallel(jobs, parallel_next, NULL,
+ test_stdin, NULL, &proc));
+ }
fprintf(stderr, "check usage\n");
return 1;
@@ -143,6 +143,36 @@ test_expect_success 'run_command runs in parallel with more tasks than jobs avai
test_cmp expect actual
'
+cat >expect <<-EOF
+preloaded output of a child
+listening for stdin:
+sample stdin 1
+sample stdin 0
+preloaded output of a child
+listening for stdin:
+sample stdin 1
+sample stdin 0
+preloaded output of a child
+listening for stdin:
+sample stdin 1
+sample stdin 0
+preloaded output of a child
+listening for stdin:
+sample stdin 1
+sample stdin 0
+EOF
+
+test_expect_success 'run_command listens to stdin' '
+ write_script stdin-script <<-\EOF &&
+ echo "listening for stdin:"
+ while read line; do
+ echo "$line"
+ done
+ EOF
+ test-tool run-command run-command-stdin 2 ./stdin-script 2>actual &&
+ test_cmp expect actual
+'
+
cat >expect <<-EOF
preloaded output of a child
asking for a quick stop