Merge branch 'ab/run-hook-api-cleanup'
Move a global variable added as a hack during regression fixes to its proper place in the API. * ab/run-hook-api-cleanup: run-command.c: remove "max_processes", add "const" to signal() handler run-command.c: pass "opts" further down, and use "opts->processes" run-command.c: use "opts->processes", not "pp->max_processes" run-command.c: don't copy "data" to "struct parallel_processes" run-command.c: don't copy "ungroup" to "struct parallel_processes" run-command.c: don't copy *_fn to "struct parallel_processes" run-command.c: make "struct parallel_processes" const if possible run-command API: move *_tr2() users to "run_processes_parallel()" run-command API: have run_process_parallel() take an "opts" struct run-command.c: use designated init for pp_init(), add "const" run-command API: don't fall back on online_cpus() run-command API: make "n" parameter a "size_t" run-command tests: use "return", not "exit" run-command API: have "run_processes_parallel{,_tr2}()" return void run-command test helper: use "else if" pattern
This commit is contained in:
commit
6ae1a6eaf2
@ -122,6 +122,8 @@ static int git_fetch_config(const char *k, const char *v, void *cb)
|
|||||||
fetch_parallel_config = git_config_int(k, v);
|
fetch_parallel_config = git_config_int(k, v);
|
||||||
if (fetch_parallel_config < 0)
|
if (fetch_parallel_config < 0)
|
||||||
die(_("fetch.parallel cannot be negative"));
|
die(_("fetch.parallel cannot be negative"));
|
||||||
|
if (!fetch_parallel_config)
|
||||||
|
fetch_parallel_config = online_cpus();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1951,17 +1953,22 @@ static int fetch_multiple(struct string_list *list, int max_children)
|
|||||||
|
|
||||||
if (max_children != 1 && list->nr != 1) {
|
if (max_children != 1 && list->nr != 1) {
|
||||||
struct parallel_fetch_state state = { argv.v, list, 0, 0 };
|
struct parallel_fetch_state state = { argv.v, list, 0, 0 };
|
||||||
|
const struct run_process_parallel_opts opts = {
|
||||||
|
.tr2_category = "fetch",
|
||||||
|
.tr2_label = "parallel/fetch",
|
||||||
|
|
||||||
|
.processes = max_children,
|
||||||
|
|
||||||
|
.get_next_task = &fetch_next_remote,
|
||||||
|
.start_failure = &fetch_failed_to_start,
|
||||||
|
.task_finished = &fetch_finished,
|
||||||
|
.data = &state,
|
||||||
|
};
|
||||||
|
|
||||||
strvec_push(&argv, "--end-of-options");
|
strvec_push(&argv, "--end-of-options");
|
||||||
result = run_processes_parallel_tr2(max_children,
|
|
||||||
&fetch_next_remote,
|
|
||||||
&fetch_failed_to_start,
|
|
||||||
&fetch_finished,
|
|
||||||
&state,
|
|
||||||
"fetch", "parallel/fetch");
|
|
||||||
|
|
||||||
if (!result)
|
run_processes_parallel(&opts);
|
||||||
result = state.result;
|
result = state.result;
|
||||||
} else
|
} else
|
||||||
for (i = 0; i < list->nr; i++) {
|
for (i = 0; i < list->nr; i++) {
|
||||||
const char *name = list->items[i].string;
|
const char *name = list->items[i].string;
|
||||||
|
@ -2567,12 +2567,20 @@ static int update_submodules(struct update_data *update_data)
|
|||||||
{
|
{
|
||||||
int i, ret = 0;
|
int i, ret = 0;
|
||||||
struct submodule_update_clone suc = SUBMODULE_UPDATE_CLONE_INIT;
|
struct submodule_update_clone suc = SUBMODULE_UPDATE_CLONE_INIT;
|
||||||
|
const struct run_process_parallel_opts opts = {
|
||||||
|
.tr2_category = "submodule",
|
||||||
|
.tr2_label = "parallel/update",
|
||||||
|
|
||||||
|
.processes = update_data->max_jobs,
|
||||||
|
|
||||||
|
.get_next_task = update_clone_get_next_task,
|
||||||
|
.start_failure = update_clone_start_failure,
|
||||||
|
.task_finished = update_clone_task_finished,
|
||||||
|
.data = &suc,
|
||||||
|
};
|
||||||
|
|
||||||
suc.update_data = update_data;
|
suc.update_data = update_data;
|
||||||
run_processes_parallel_tr2(suc.update_data->max_jobs, update_clone_get_next_task,
|
run_processes_parallel(&opts);
|
||||||
update_clone_start_failure,
|
|
||||||
update_clone_task_finished, &suc, "submodule",
|
|
||||||
"parallel/update");
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We saved the output and put it out all at once now.
|
* We saved the output and put it out all at once now.
|
||||||
|
23
hook.c
23
hook.c
@ -114,8 +114,20 @@ int run_hooks_opt(const char *hook_name, struct run_hooks_opt *options)
|
|||||||
.options = options,
|
.options = options,
|
||||||
};
|
};
|
||||||
const char *const hook_path = find_hook(hook_name);
|
const char *const hook_path = find_hook(hook_name);
|
||||||
int jobs = 1;
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
const struct run_process_parallel_opts opts = {
|
||||||
|
.tr2_category = "hook",
|
||||||
|
.tr2_label = hook_name,
|
||||||
|
|
||||||
|
.processes = 1,
|
||||||
|
.ungroup = 1,
|
||||||
|
|
||||||
|
.get_next_task = pick_next_hook,
|
||||||
|
.start_failure = notify_start_failure,
|
||||||
|
.task_finished = notify_hook_finished,
|
||||||
|
|
||||||
|
.data = &cb_data,
|
||||||
|
};
|
||||||
|
|
||||||
if (!options)
|
if (!options)
|
||||||
BUG("a struct run_hooks_opt must be provided to run_hooks");
|
BUG("a struct run_hooks_opt must be provided to run_hooks");
|
||||||
@ -137,14 +149,7 @@ int run_hooks_opt(const char *hook_name, struct run_hooks_opt *options)
|
|||||||
cb_data.hook_path = abs_path.buf;
|
cb_data.hook_path = abs_path.buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
run_processes_parallel_ungroup = 1;
|
run_processes_parallel(&opts);
|
||||||
run_processes_parallel_tr2(jobs,
|
|
||||||
pick_next_hook,
|
|
||||||
notify_start_failure,
|
|
||||||
notify_hook_finished,
|
|
||||||
&cb_data,
|
|
||||||
"hook",
|
|
||||||
hook_name);
|
|
||||||
ret = cb_data.rc;
|
ret = cb_data.rc;
|
||||||
cleanup:
|
cleanup:
|
||||||
strbuf_release(&abs_path);
|
strbuf_release(&abs_path);
|
||||||
|
234
run-command.c
234
run-command.c
@ -1496,16 +1496,8 @@ enum child_state {
|
|||||||
GIT_CP_WAIT_CLEANUP,
|
GIT_CP_WAIT_CLEANUP,
|
||||||
};
|
};
|
||||||
|
|
||||||
int run_processes_parallel_ungroup;
|
|
||||||
struct parallel_processes {
|
struct parallel_processes {
|
||||||
void *data;
|
size_t nr_processes;
|
||||||
|
|
||||||
int max_processes;
|
|
||||||
int nr_processes;
|
|
||||||
|
|
||||||
get_next_task_fn get_next_task;
|
|
||||||
start_failure_fn start_failure;
|
|
||||||
task_finished_fn task_finished;
|
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
enum child_state state;
|
enum child_state state;
|
||||||
@ -1520,81 +1512,60 @@ struct parallel_processes {
|
|||||||
struct pollfd *pfd;
|
struct pollfd *pfd;
|
||||||
|
|
||||||
unsigned shutdown : 1;
|
unsigned shutdown : 1;
|
||||||
unsigned ungroup : 1;
|
|
||||||
|
|
||||||
int output_owner;
|
size_t output_owner;
|
||||||
struct strbuf buffered_output; /* of finished children */
|
struct strbuf buffered_output; /* of finished children */
|
||||||
};
|
};
|
||||||
|
|
||||||
static int default_start_failure(struct strbuf *out,
|
struct parallel_processes_for_signal {
|
||||||
void *pp_cb,
|
const struct run_process_parallel_opts *opts;
|
||||||
void *pp_task_cb)
|
const struct parallel_processes *pp;
|
||||||
{
|
};
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int default_task_finished(int result,
|
static void kill_children(const struct parallel_processes *pp,
|
||||||
struct strbuf *out,
|
const struct run_process_parallel_opts *opts,
|
||||||
void *pp_cb,
|
int signo)
|
||||||
void *pp_task_cb)
|
|
||||||
{
|
{
|
||||||
return 0;
|
for (size_t i = 0; i < opts->processes; i++)
|
||||||
}
|
|
||||||
|
|
||||||
static void kill_children(struct parallel_processes *pp, int signo)
|
|
||||||
{
|
|
||||||
int i, n = pp->max_processes;
|
|
||||||
|
|
||||||
for (i = 0; i < n; i++)
|
|
||||||
if (pp->children[i].state == GIT_CP_WORKING)
|
if (pp->children[i].state == GIT_CP_WORKING)
|
||||||
kill(pp->children[i].process.pid, signo);
|
kill(pp->children[i].process.pid, signo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct parallel_processes *pp_for_signal;
|
static void kill_children_signal(const struct parallel_processes_for_signal *pp_sig,
|
||||||
|
int signo)
|
||||||
|
{
|
||||||
|
kill_children(pp_sig->pp, pp_sig->opts, signo);
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct parallel_processes_for_signal *pp_for_signal;
|
||||||
|
|
||||||
static void handle_children_on_signal(int signo)
|
static void handle_children_on_signal(int signo)
|
||||||
{
|
{
|
||||||
kill_children(pp_for_signal, signo);
|
kill_children_signal(pp_for_signal, signo);
|
||||||
sigchain_pop(signo);
|
sigchain_pop(signo);
|
||||||
raise(signo);
|
raise(signo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void pp_init(struct parallel_processes *pp,
|
static void pp_init(struct parallel_processes *pp,
|
||||||
int n,
|
const struct run_process_parallel_opts *opts,
|
||||||
get_next_task_fn get_next_task,
|
struct parallel_processes_for_signal *pp_sig)
|
||||||
start_failure_fn start_failure,
|
|
||||||
task_finished_fn task_finished,
|
|
||||||
void *data, int ungroup)
|
|
||||||
{
|
{
|
||||||
int i;
|
const size_t n = opts->processes;
|
||||||
|
|
||||||
if (n < 1)
|
if (!n)
|
||||||
n = online_cpus();
|
BUG("you must provide a non-zero number of processes!");
|
||||||
|
|
||||||
pp->max_processes = n;
|
trace_printf("run_processes_parallel: preparing to run up to %"PRIuMAX" tasks",
|
||||||
|
(uintmax_t)n);
|
||||||
|
|
||||||
trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
|
if (!opts->get_next_task)
|
||||||
|
|
||||||
pp->data = data;
|
|
||||||
if (!get_next_task)
|
|
||||||
BUG("you need to specify a get_next_task function");
|
BUG("you need to specify a get_next_task function");
|
||||||
pp->get_next_task = get_next_task;
|
|
||||||
|
|
||||||
pp->start_failure = start_failure ? start_failure : default_start_failure;
|
|
||||||
pp->task_finished = task_finished ? task_finished : default_task_finished;
|
|
||||||
|
|
||||||
pp->nr_processes = 0;
|
|
||||||
pp->output_owner = 0;
|
|
||||||
pp->shutdown = 0;
|
|
||||||
pp->ungroup = ungroup;
|
|
||||||
CALLOC_ARRAY(pp->children, n);
|
CALLOC_ARRAY(pp->children, n);
|
||||||
if (pp->ungroup)
|
if (!opts->ungroup)
|
||||||
pp->pfd = NULL;
|
|
||||||
else
|
|
||||||
CALLOC_ARRAY(pp->pfd, n);
|
CALLOC_ARRAY(pp->pfd, n);
|
||||||
strbuf_init(&pp->buffered_output, 0);
|
|
||||||
|
|
||||||
for (i = 0; i < n; i++) {
|
for (size_t i = 0; i < n; i++) {
|
||||||
strbuf_init(&pp->children[i].err, 0);
|
strbuf_init(&pp->children[i].err, 0);
|
||||||
child_process_init(&pp->children[i].process);
|
child_process_init(&pp->children[i].process);
|
||||||
if (pp->pfd) {
|
if (pp->pfd) {
|
||||||
@ -1603,16 +1574,17 @@ static void pp_init(struct parallel_processes *pp,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pp_for_signal = pp;
|
pp_sig->pp = pp;
|
||||||
|
pp_sig->opts = opts;
|
||||||
|
pp_for_signal = pp_sig;
|
||||||
sigchain_push_common(handle_children_on_signal);
|
sigchain_push_common(handle_children_on_signal);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void pp_cleanup(struct parallel_processes *pp)
|
static void pp_cleanup(struct parallel_processes *pp,
|
||||||
|
const struct run_process_parallel_opts *opts)
|
||||||
{
|
{
|
||||||
int i;
|
|
||||||
|
|
||||||
trace_printf("run_processes_parallel: done");
|
trace_printf("run_processes_parallel: done");
|
||||||
for (i = 0; i < pp->max_processes; i++) {
|
for (size_t i = 0; i < opts->processes; i++) {
|
||||||
strbuf_release(&pp->children[i].err);
|
strbuf_release(&pp->children[i].err);
|
||||||
child_process_clear(&pp->children[i].process);
|
child_process_clear(&pp->children[i].process);
|
||||||
}
|
}
|
||||||
@ -1637,39 +1609,45 @@ static void pp_cleanup(struct parallel_processes *pp)
|
|||||||
* <0 no new job was started, user wishes to shutdown early. Use negative code
|
* <0 no new job was started, user wishes to shutdown early. Use negative code
|
||||||
* to signal the children.
|
* to signal the children.
|
||||||
*/
|
*/
|
||||||
static int pp_start_one(struct parallel_processes *pp)
|
static int pp_start_one(struct parallel_processes *pp,
|
||||||
|
const struct run_process_parallel_opts *opts)
|
||||||
{
|
{
|
||||||
int i, code;
|
size_t i;
|
||||||
|
int code;
|
||||||
|
|
||||||
for (i = 0; i < pp->max_processes; i++)
|
for (i = 0; i < opts->processes; i++)
|
||||||
if (pp->children[i].state == GIT_CP_FREE)
|
if (pp->children[i].state == GIT_CP_FREE)
|
||||||
break;
|
break;
|
||||||
if (i == pp->max_processes)
|
if (i == opts->processes)
|
||||||
BUG("bookkeeping is hard");
|
BUG("bookkeeping is hard");
|
||||||
|
|
||||||
code = pp->get_next_task(&pp->children[i].process,
|
code = opts->get_next_task(&pp->children[i].process,
|
||||||
pp->ungroup ? NULL : &pp->children[i].err,
|
opts->ungroup ? NULL : &pp->children[i].err,
|
||||||
pp->data,
|
opts->data,
|
||||||
&pp->children[i].data);
|
&pp->children[i].data);
|
||||||
if (!code) {
|
if (!code) {
|
||||||
if (!pp->ungroup) {
|
if (!opts->ungroup) {
|
||||||
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
|
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
|
||||||
strbuf_reset(&pp->children[i].err);
|
strbuf_reset(&pp->children[i].err);
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
if (!pp->ungroup) {
|
if (!opts->ungroup) {
|
||||||
pp->children[i].process.err = -1;
|
pp->children[i].process.err = -1;
|
||||||
pp->children[i].process.stdout_to_stderr = 1;
|
pp->children[i].process.stdout_to_stderr = 1;
|
||||||
}
|
}
|
||||||
pp->children[i].process.no_stdin = 1;
|
pp->children[i].process.no_stdin = 1;
|
||||||
|
|
||||||
if (start_command(&pp->children[i].process)) {
|
if (start_command(&pp->children[i].process)) {
|
||||||
code = pp->start_failure(pp->ungroup ? NULL :
|
if (opts->start_failure)
|
||||||
&pp->children[i].err,
|
code = opts->start_failure(opts->ungroup ? NULL :
|
||||||
pp->data,
|
&pp->children[i].err,
|
||||||
pp->children[i].data);
|
opts->data,
|
||||||
if (!pp->ungroup) {
|
pp->children[i].data);
|
||||||
|
else
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
if (!opts->ungroup) {
|
||||||
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
|
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
|
||||||
strbuf_reset(&pp->children[i].err);
|
strbuf_reset(&pp->children[i].err);
|
||||||
}
|
}
|
||||||
@ -1685,19 +1663,21 @@ static int pp_start_one(struct parallel_processes *pp)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
|
static void pp_buffer_stderr(struct parallel_processes *pp,
|
||||||
|
const struct run_process_parallel_opts *opts,
|
||||||
|
int output_timeout)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
|
while ((i = poll(pp->pfd, opts->processes, output_timeout) < 0)) {
|
||||||
if (errno == EINTR)
|
if (errno == EINTR)
|
||||||
continue;
|
continue;
|
||||||
pp_cleanup(pp);
|
pp_cleanup(pp, opts);
|
||||||
die_errno("poll");
|
die_errno("poll");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Buffer output from all pipes. */
|
/* Buffer output from all pipes. */
|
||||||
for (i = 0; i < pp->max_processes; i++) {
|
for (size_t i = 0; i < opts->processes; i++) {
|
||||||
if (pp->children[i].state == GIT_CP_WORKING &&
|
if (pp->children[i].state == GIT_CP_WORKING &&
|
||||||
pp->pfd[i].revents & (POLLIN | POLLHUP)) {
|
pp->pfd[i].revents & (POLLIN | POLLHUP)) {
|
||||||
int n = strbuf_read_once(&pp->children[i].err,
|
int n = strbuf_read_once(&pp->children[i].err,
|
||||||
@ -1712,9 +1692,9 @@ static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void pp_output(struct parallel_processes *pp)
|
static void pp_output(const struct parallel_processes *pp)
|
||||||
{
|
{
|
||||||
int i = pp->output_owner;
|
size_t i = pp->output_owner;
|
||||||
|
|
||||||
if (pp->children[i].state == GIT_CP_WORKING &&
|
if (pp->children[i].state == GIT_CP_WORKING &&
|
||||||
pp->children[i].err.len) {
|
pp->children[i].err.len) {
|
||||||
@ -1723,24 +1703,28 @@ static void pp_output(struct parallel_processes *pp)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int pp_collect_finished(struct parallel_processes *pp)
|
static int pp_collect_finished(struct parallel_processes *pp,
|
||||||
|
const struct run_process_parallel_opts *opts)
|
||||||
{
|
{
|
||||||
int i, code;
|
int code;
|
||||||
int n = pp->max_processes;
|
size_t i;
|
||||||
int result = 0;
|
int result = 0;
|
||||||
|
|
||||||
while (pp->nr_processes > 0) {
|
while (pp->nr_processes > 0) {
|
||||||
for (i = 0; i < pp->max_processes; i++)
|
for (i = 0; i < opts->processes; i++)
|
||||||
if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
|
if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
|
||||||
break;
|
break;
|
||||||
if (i == pp->max_processes)
|
if (i == opts->processes)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
code = finish_command(&pp->children[i].process);
|
code = finish_command(&pp->children[i].process);
|
||||||
|
|
||||||
code = pp->task_finished(code, pp->ungroup ? NULL :
|
if (opts->task_finished)
|
||||||
&pp->children[i].err, pp->data,
|
code = opts->task_finished(code, opts->ungroup ? NULL :
|
||||||
pp->children[i].data);
|
&pp->children[i].err, opts->data,
|
||||||
|
pp->children[i].data);
|
||||||
|
else
|
||||||
|
code = 0;
|
||||||
|
|
||||||
if (code)
|
if (code)
|
||||||
result = code;
|
result = code;
|
||||||
@ -1753,12 +1737,14 @@ static int pp_collect_finished(struct parallel_processes *pp)
|
|||||||
pp->pfd[i].fd = -1;
|
pp->pfd[i].fd = -1;
|
||||||
child_process_init(&pp->children[i].process);
|
child_process_init(&pp->children[i].process);
|
||||||
|
|
||||||
if (pp->ungroup) {
|
if (opts->ungroup) {
|
||||||
; /* no strbuf_*() work to do here */
|
; /* no strbuf_*() work to do here */
|
||||||
} else if (i != pp->output_owner) {
|
} else if (i != pp->output_owner) {
|
||||||
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
|
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
|
||||||
strbuf_reset(&pp->children[i].err);
|
strbuf_reset(&pp->children[i].err);
|
||||||
} else {
|
} else {
|
||||||
|
const size_t n = opts->processes;
|
||||||
|
|
||||||
strbuf_write(&pp->children[i].err, stderr);
|
strbuf_write(&pp->children[i].err, stderr);
|
||||||
strbuf_reset(&pp->children[i].err);
|
strbuf_reset(&pp->children[i].err);
|
||||||
|
|
||||||
@ -1783,76 +1769,60 @@ static int pp_collect_finished(struct parallel_processes *pp)
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
int run_processes_parallel(int n,
|
void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
||||||
get_next_task_fn get_next_task,
|
|
||||||
start_failure_fn start_failure,
|
|
||||||
task_finished_fn task_finished,
|
|
||||||
void *pp_cb)
|
|
||||||
{
|
{
|
||||||
int i, code;
|
int i, code;
|
||||||
int output_timeout = 100;
|
int output_timeout = 100;
|
||||||
int spawn_cap = 4;
|
int spawn_cap = 4;
|
||||||
int ungroup = run_processes_parallel_ungroup;
|
struct parallel_processes_for_signal pp_sig;
|
||||||
struct parallel_processes pp;
|
struct parallel_processes pp = {
|
||||||
|
.buffered_output = STRBUF_INIT,
|
||||||
|
};
|
||||||
|
/* options */
|
||||||
|
const char *tr2_category = opts->tr2_category;
|
||||||
|
const char *tr2_label = opts->tr2_label;
|
||||||
|
const int do_trace2 = tr2_category && tr2_label;
|
||||||
|
|
||||||
/* unset for the next API user */
|
if (do_trace2)
|
||||||
run_processes_parallel_ungroup = 0;
|
trace2_region_enter_printf(tr2_category, tr2_label, NULL,
|
||||||
|
"max:%d", opts->processes);
|
||||||
|
|
||||||
pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb,
|
pp_init(&pp, opts, &pp_sig);
|
||||||
ungroup);
|
|
||||||
while (1) {
|
while (1) {
|
||||||
for (i = 0;
|
for (i = 0;
|
||||||
i < spawn_cap && !pp.shutdown &&
|
i < spawn_cap && !pp.shutdown &&
|
||||||
pp.nr_processes < pp.max_processes;
|
pp.nr_processes < opts->processes;
|
||||||
i++) {
|
i++) {
|
||||||
code = pp_start_one(&pp);
|
code = pp_start_one(&pp, opts);
|
||||||
if (!code)
|
if (!code)
|
||||||
continue;
|
continue;
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
pp.shutdown = 1;
|
pp.shutdown = 1;
|
||||||
kill_children(&pp, -code);
|
kill_children(&pp, opts, -code);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!pp.nr_processes)
|
if (!pp.nr_processes)
|
||||||
break;
|
break;
|
||||||
if (ungroup) {
|
if (opts->ungroup) {
|
||||||
int i;
|
for (size_t i = 0; i < opts->processes; i++)
|
||||||
|
|
||||||
for (i = 0; i < pp.max_processes; i++)
|
|
||||||
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
|
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
|
||||||
} else {
|
} else {
|
||||||
pp_buffer_stderr(&pp, output_timeout);
|
pp_buffer_stderr(&pp, opts, output_timeout);
|
||||||
pp_output(&pp);
|
pp_output(&pp);
|
||||||
}
|
}
|
||||||
code = pp_collect_finished(&pp);
|
code = pp_collect_finished(&pp, opts);
|
||||||
if (code) {
|
if (code) {
|
||||||
pp.shutdown = 1;
|
pp.shutdown = 1;
|
||||||
if (code < 0)
|
if (code < 0)
|
||||||
kill_children(&pp, -code);
|
kill_children(&pp, opts,-code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pp_cleanup(&pp);
|
pp_cleanup(&pp, opts);
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int run_processes_parallel_tr2(int n, get_next_task_fn get_next_task,
|
if (do_trace2)
|
||||||
start_failure_fn start_failure,
|
trace2_region_leave(tr2_category, tr2_label, NULL);
|
||||||
task_finished_fn task_finished, void *pp_cb,
|
|
||||||
const char *tr2_category, const char *tr2_label)
|
|
||||||
{
|
|
||||||
int result;
|
|
||||||
|
|
||||||
trace2_region_enter_printf(tr2_category, tr2_label, NULL, "max:%d",
|
|
||||||
((n < 1) ? online_cpus() : n));
|
|
||||||
|
|
||||||
result = run_processes_parallel(n, get_next_task, start_failure,
|
|
||||||
task_finished, pp_cb);
|
|
||||||
|
|
||||||
trace2_region_leave(tr2_category, tr2_label, NULL);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int run_auto_maintenance(int quiet)
|
int run_auto_maintenance(int quiet)
|
||||||
|
@ -459,17 +459,64 @@ typedef int (*task_finished_fn)(int result,
|
|||||||
void *pp_task_cb);
|
void *pp_task_cb);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs up to n processes at the same time. Whenever a process can be
|
* Option used by run_processes_parallel(), { 0 }-initialized means no
|
||||||
* started, the callback get_next_task_fn is called to obtain the data
|
* options.
|
||||||
|
*/
|
||||||
|
struct run_process_parallel_opts
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* tr2_category & tr2_label: sets the trace2 category and label for
|
||||||
|
* logging. These must either be unset, or both of them must be set.
|
||||||
|
*/
|
||||||
|
const char *tr2_category;
|
||||||
|
const char *tr2_label;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* processes: see 'processes' in run_processes_parallel() below.
|
||||||
|
*/
|
||||||
|
size_t processes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ungroup: see 'ungroup' in run_processes_parallel() below.
|
||||||
|
*/
|
||||||
|
unsigned int ungroup:1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get_next_task: See get_next_task_fn() above. This must be
|
||||||
|
* specified.
|
||||||
|
*/
|
||||||
|
get_next_task_fn get_next_task;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* start_failure: See start_failure_fn() above. This can be
|
||||||
|
* NULL to omit any special handling.
|
||||||
|
*/
|
||||||
|
start_failure_fn start_failure;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* task_finished: See task_finished_fn() above. This can be
|
||||||
|
* NULL to omit any special handling.
|
||||||
|
*/
|
||||||
|
task_finished_fn task_finished;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* data: user data, will be passed as "pp_cb" to the callback
|
||||||
|
* parameters.
|
||||||
|
*/
|
||||||
|
void *data;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Options are passed via the "struct run_process_parallel_opts" above.
|
||||||
|
*
|
||||||
|
* Runs N 'processes' at the same time. Whenever a process can be
|
||||||
|
* started, the callback opts.get_next_task is called to obtain the data
|
||||||
* required to start another child process.
|
* required to start another child process.
|
||||||
*
|
*
|
||||||
* The children started via this function run in parallel. Their output
|
* The children started via this function run in parallel. Their output
|
||||||
* (both stdout and stderr) is routed to stderr in a manner that output
|
* (both stdout and stderr) is routed to stderr in a manner that output
|
||||||
* from different tasks does not interleave (but see "ungroup" below).
|
* from different tasks does not interleave (but see "ungroup" below).
|
||||||
*
|
*
|
||||||
* start_failure_fn and task_finished_fn can be NULL to omit any
|
|
||||||
* special handling.
|
|
||||||
*
|
|
||||||
* If the "ungroup" option isn't specified, the API will set the
|
* If the "ungroup" option isn't specified, the API will set the
|
||||||
* "stdout_to_stderr" parameter in "struct child_process" and provide
|
* "stdout_to_stderr" parameter in "struct child_process" and provide
|
||||||
* the callbacks with a "struct strbuf *out" parameter to write output
|
* the callbacks with a "struct strbuf *out" parameter to write output
|
||||||
@ -479,20 +526,8 @@ typedef int (*task_finished_fn)(int result,
|
|||||||
* NULL "struct strbuf *out" parameter, and are responsible for
|
* NULL "struct strbuf *out" parameter, and are responsible for
|
||||||
* emitting their own output, including dealing with any race
|
* emitting their own output, including dealing with any race
|
||||||
* conditions due to writing in parallel to stdout and stderr.
|
* conditions due to writing in parallel to stdout and stderr.
|
||||||
* The "ungroup" option can be enabled by setting the global
|
|
||||||
* "run_processes_parallel_ungroup" to "1" before invoking
|
|
||||||
* run_processes_parallel(), it will be set back to "0" as soon as the
|
|
||||||
* API reads that setting.
|
|
||||||
*/
|
*/
|
||||||
extern int run_processes_parallel_ungroup;
|
void run_processes_parallel(const struct run_process_parallel_opts *opts);
|
||||||
int run_processes_parallel(int n,
|
|
||||||
get_next_task_fn,
|
|
||||||
start_failure_fn,
|
|
||||||
task_finished_fn,
|
|
||||||
void *pp_cb);
|
|
||||||
int run_processes_parallel_tr2(int n, get_next_task_fn, start_failure_fn,
|
|
||||||
task_finished_fn, void *pp_cb,
|
|
||||||
const char *tr2_category, const char *tr2_label);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience function which prepares env for a command to be run in a
|
* Convenience function which prepares env for a command to be run in a
|
||||||
|
@ -303,6 +303,8 @@ int parse_submodule_fetchjobs(const char *var, const char *value)
|
|||||||
int fetchjobs = git_config_int(var, value);
|
int fetchjobs = git_config_int(var, value);
|
||||||
if (fetchjobs < 0)
|
if (fetchjobs < 0)
|
||||||
die(_("negative values not allowed for submodule.fetchJobs"));
|
die(_("negative values not allowed for submodule.fetchJobs"));
|
||||||
|
if (!fetchjobs)
|
||||||
|
fetchjobs = online_cpus();
|
||||||
return fetchjobs;
|
return fetchjobs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
18
submodule.c
18
submodule.c
@ -1819,6 +1819,17 @@ int fetch_submodules(struct repository *r,
|
|||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
struct submodule_parallel_fetch spf = SPF_INIT;
|
struct submodule_parallel_fetch spf = SPF_INIT;
|
||||||
|
const struct run_process_parallel_opts opts = {
|
||||||
|
.tr2_category = "submodule",
|
||||||
|
.tr2_label = "parallel/fetch",
|
||||||
|
|
||||||
|
.processes = max_parallel_jobs,
|
||||||
|
|
||||||
|
.get_next_task = get_next_submodule,
|
||||||
|
.start_failure = fetch_start_failure,
|
||||||
|
.task_finished = fetch_finish,
|
||||||
|
.data = &spf,
|
||||||
|
};
|
||||||
|
|
||||||
spf.r = r;
|
spf.r = r;
|
||||||
spf.command_line_option = command_line_option;
|
spf.command_line_option = command_line_option;
|
||||||
@ -1840,12 +1851,7 @@ int fetch_submodules(struct repository *r,
|
|||||||
|
|
||||||
calculate_changed_submodule_paths(r, &spf.changed_submodule_names);
|
calculate_changed_submodule_paths(r, &spf.changed_submodule_names);
|
||||||
string_list_sort(&spf.changed_submodule_names);
|
string_list_sort(&spf.changed_submodule_names);
|
||||||
run_processes_parallel_tr2(max_parallel_jobs,
|
run_processes_parallel(&opts);
|
||||||
get_next_submodule,
|
|
||||||
fetch_start_failure,
|
|
||||||
fetch_finish,
|
|
||||||
&spf,
|
|
||||||
"submodule", "parallel/fetch");
|
|
||||||
|
|
||||||
if (spf.submodules_with_errors.len > 0)
|
if (spf.submodules_with_errors.len > 0)
|
||||||
fprintf(stderr, _("Errors during submodule fetch:\n%s"),
|
fprintf(stderr, _("Errors during submodule fetch:\n%s"),
|
||||||
|
@ -136,7 +136,7 @@ static const char * const testsuite_usage[] = {
|
|||||||
static int testsuite(int argc, const char **argv)
|
static int testsuite(int argc, const char **argv)
|
||||||
{
|
{
|
||||||
struct testsuite suite = TESTSUITE_INIT;
|
struct testsuite suite = TESTSUITE_INIT;
|
||||||
int max_jobs = 1, i, ret;
|
int max_jobs = 1, i, ret = 0;
|
||||||
DIR *dir;
|
DIR *dir;
|
||||||
struct dirent *d;
|
struct dirent *d;
|
||||||
struct option options[] = {
|
struct option options[] = {
|
||||||
@ -152,6 +152,12 @@ static int testsuite(int argc, const char **argv)
|
|||||||
"write JUnit-style XML files"),
|
"write JUnit-style XML files"),
|
||||||
OPT_END()
|
OPT_END()
|
||||||
};
|
};
|
||||||
|
struct run_process_parallel_opts opts = {
|
||||||
|
.get_next_task = next_test,
|
||||||
|
.start_failure = test_failed,
|
||||||
|
.task_finished = test_finished,
|
||||||
|
.data = &suite,
|
||||||
|
};
|
||||||
|
|
||||||
argc = parse_options(argc, argv, NULL, options,
|
argc = parse_options(argc, argv, NULL, options,
|
||||||
testsuite_usage, PARSE_OPT_STOP_AT_NON_OPTION);
|
testsuite_usage, PARSE_OPT_STOP_AT_NON_OPTION);
|
||||||
@ -192,8 +198,8 @@ static int testsuite(int argc, const char **argv)
|
|||||||
fprintf(stderr, "Running %"PRIuMAX" tests (%d at a time)\n",
|
fprintf(stderr, "Running %"PRIuMAX" tests (%d at a time)\n",
|
||||||
(uintmax_t)suite.tests.nr, max_jobs);
|
(uintmax_t)suite.tests.nr, max_jobs);
|
||||||
|
|
||||||
ret = run_processes_parallel(max_jobs, next_test, test_failed,
|
opts.processes = max_jobs;
|
||||||
test_finished, &suite);
|
run_processes_parallel(&opts);
|
||||||
|
|
||||||
if (suite.failed.nr > 0) {
|
if (suite.failed.nr > 0) {
|
||||||
ret = 1;
|
ret = 1;
|
||||||
@ -206,7 +212,7 @@ static int testsuite(int argc, const char **argv)
|
|||||||
string_list_clear(&suite.tests, 0);
|
string_list_clear(&suite.tests, 0);
|
||||||
string_list_clear(&suite.failed, 0);
|
string_list_clear(&suite.failed, 0);
|
||||||
|
|
||||||
return !!ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t my_random_next = 1234;
|
static uint64_t my_random_next = 1234;
|
||||||
@ -381,13 +387,17 @@ int cmd__run_command(int argc, const char **argv)
|
|||||||
{
|
{
|
||||||
struct child_process proc = CHILD_PROCESS_INIT;
|
struct child_process proc = CHILD_PROCESS_INIT;
|
||||||
int jobs;
|
int jobs;
|
||||||
|
int ret;
|
||||||
|
struct run_process_parallel_opts opts = {
|
||||||
|
.data = &proc,
|
||||||
|
};
|
||||||
|
|
||||||
if (argc > 1 && !strcmp(argv[1], "testsuite"))
|
if (argc > 1 && !strcmp(argv[1], "testsuite"))
|
||||||
exit(testsuite(argc - 1, argv + 1));
|
return testsuite(argc - 1, argv + 1);
|
||||||
if (!strcmp(argv[1], "inherited-handle"))
|
if (!strcmp(argv[1], "inherited-handle"))
|
||||||
exit(inherit_handle(argv[0]));
|
return inherit_handle(argv[0]);
|
||||||
if (!strcmp(argv[1], "inherited-handle-child"))
|
if (!strcmp(argv[1], "inherited-handle-child"))
|
||||||
exit(inherit_handle_child());
|
return inherit_handle_child();
|
||||||
|
|
||||||
if (argc >= 2 && !strcmp(argv[1], "quote-stress-test"))
|
if (argc >= 2 && !strcmp(argv[1], "quote-stress-test"))
|
||||||
return !!quote_stress_test(argc - 1, argv + 1);
|
return !!quote_stress_test(argc - 1, argv + 1);
|
||||||
@ -404,41 +414,52 @@ int cmd__run_command(int argc, const char **argv)
|
|||||||
argv += 2;
|
argv += 2;
|
||||||
argc -= 2;
|
argc -= 2;
|
||||||
}
|
}
|
||||||
if (argc < 3)
|
if (argc < 3) {
|
||||||
return 1;
|
ret = 1;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
strvec_pushv(&proc.args, (const char **)argv + 2);
|
strvec_pushv(&proc.args, (const char **)argv + 2);
|
||||||
|
|
||||||
if (!strcmp(argv[1], "start-command-ENOENT")) {
|
if (!strcmp(argv[1], "start-command-ENOENT")) {
|
||||||
if (start_command(&proc) < 0 && errno == ENOENT)
|
if (start_command(&proc) < 0 && errno == ENOENT) {
|
||||||
return 0;
|
ret = 0;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
fprintf(stderr, "FAIL %s\n", argv[1]);
|
fprintf(stderr, "FAIL %s\n", argv[1]);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
if (!strcmp(argv[1], "run-command"))
|
if (!strcmp(argv[1], "run-command")) {
|
||||||
exit(run_command(&proc));
|
ret = run_command(&proc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
if (!strcmp(argv[1], "--ungroup")) {
|
if (!strcmp(argv[1], "--ungroup")) {
|
||||||
argv += 1;
|
argv += 1;
|
||||||
argc -= 1;
|
argc -= 1;
|
||||||
run_processes_parallel_ungroup = 1;
|
opts.ungroup = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
jobs = atoi(argv[2]);
|
jobs = atoi(argv[2]);
|
||||||
strvec_clear(&proc.args);
|
strvec_clear(&proc.args);
|
||||||
strvec_pushv(&proc.args, (const char **)argv + 3);
|
strvec_pushv(&proc.args, (const char **)argv + 3);
|
||||||
|
|
||||||
if (!strcmp(argv[1], "run-command-parallel"))
|
if (!strcmp(argv[1], "run-command-parallel")) {
|
||||||
exit(run_processes_parallel(jobs, parallel_next,
|
opts.get_next_task = parallel_next;
|
||||||
NULL, NULL, &proc));
|
} else if (!strcmp(argv[1], "run-command-abort")) {
|
||||||
|
opts.get_next_task = parallel_next;
|
||||||
if (!strcmp(argv[1], "run-command-abort"))
|
opts.task_finished = task_finished;
|
||||||
exit(run_processes_parallel(jobs, parallel_next,
|
} else if (!strcmp(argv[1], "run-command-no-jobs")) {
|
||||||
NULL, task_finished, &proc));
|
opts.get_next_task = no_job;
|
||||||
|
opts.task_finished = task_finished;
|
||||||
if (!strcmp(argv[1], "run-command-no-jobs"))
|
} else {
|
||||||
exit(run_processes_parallel(jobs, no_job,
|
ret = 1;
|
||||||
NULL, task_finished, &proc));
|
fprintf(stderr, "check usage\n");
|
||||||
|
goto cleanup;
|
||||||
fprintf(stderr, "check usage\n");
|
}
|
||||||
return 1;
|
opts.processes = jobs;
|
||||||
|
run_processes_parallel(&opts);
|
||||||
|
ret = 0;
|
||||||
|
cleanup:
|
||||||
|
child_process_clear(&proc);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -715,7 +715,13 @@ test_expect_success 'fetching submodules respects parallel settings' '
|
|||||||
GIT_TRACE=$(pwd)/trace.out git fetch &&
|
GIT_TRACE=$(pwd)/trace.out git fetch &&
|
||||||
grep "8 tasks" trace.out &&
|
grep "8 tasks" trace.out &&
|
||||||
GIT_TRACE=$(pwd)/trace.out git fetch --jobs 9 &&
|
GIT_TRACE=$(pwd)/trace.out git fetch --jobs 9 &&
|
||||||
grep "9 tasks" trace.out
|
grep "9 tasks" trace.out &&
|
||||||
|
>trace.out &&
|
||||||
|
|
||||||
|
GIT_TRACE=$(pwd)/trace.out git -c submodule.fetchJobs=0 fetch &&
|
||||||
|
grep "preparing to run up to [0-9]* tasks" trace.out &&
|
||||||
|
! grep "up to 0 tasks" trace.out &&
|
||||||
|
>trace.out
|
||||||
)
|
)
|
||||||
'
|
'
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user