Merge branch 'sb/submodule-parallel-fetch'

Add a framework to spawn a group of processes in parallel, and use
it to run "git fetch --recurse-submodules" in parallel.

Rerolled and this seems to be a lot cleaner.  The merge of the
earlier one to 'next' has been reverted.

* sb/submodule-parallel-fetch:
  submodules: allow parallel fetching, add tests and documentation
  fetch_populated_submodules: use new parallel job processing
  run-command: add an asynchronous parallel child processor
  sigchain: add command to pop all common signals
  strbuf: add strbuf_read_once to read without blocking
  xread: poll on non blocking fds
  submodule.c: write "Fetching submodule <foo>" to stderr
This commit is contained in:
Junio C Hamano 2016-01-12 15:16:54 -08:00
commit 187c0d3d9e
15 changed files with 731 additions and 74 deletions

View File

@ -100,6 +100,13 @@ ifndef::git-pull[]
reference to a commit that isn't already in the local submodule reference to a commit that isn't already in the local submodule
clone. clone.
-j::
--jobs=<n>::
Number of parallel children to be used for fetching submodules.
Each will fetch from different submodules, such that fetching many
submodules will be faster. By default submodules will be fetched
one at a time.
--no-recurse-submodules:: --no-recurse-submodules::
Disable recursive fetching of submodules (this has the same effect as Disable recursive fetching of submodules (this has the same effect as
using the '--recurse-submodules=no' option). using the '--recurse-submodules=no' option).

View File

@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */
static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity; static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity;
static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT; static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT;
static int tags = TAGS_DEFAULT, unshallow, update_shallow; static int tags = TAGS_DEFAULT, unshallow, update_shallow;
static int max_children = 1;
static const char *depth; static const char *depth;
static const char *upload_pack; static const char *upload_pack;
static struct strbuf default_rla = STRBUF_INIT; static struct strbuf default_rla = STRBUF_INIT;
@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = {
N_("fetch all tags and associated objects"), TAGS_SET), N_("fetch all tags and associated objects"), TAGS_SET),
OPT_SET_INT('n', NULL, &tags, OPT_SET_INT('n', NULL, &tags,
N_("do not fetch all tags (--no-tags)"), TAGS_UNSET), N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
OPT_INTEGER('j', "jobs", &max_children,
N_("number of submodules fetched in parallel")),
OPT_BOOL('p', "prune", &prune, OPT_BOOL('p', "prune", &prune,
N_("prune remote-tracking branches no longer on remote")), N_("prune remote-tracking branches no longer on remote")),
{ OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"), { OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
@ -1213,7 +1216,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix)
result = fetch_populated_submodules(&options, result = fetch_populated_submodules(&options,
submodule_prefix, submodule_prefix,
recurse_submodules, recurse_submodules,
verbosity < 0); verbosity < 0,
max_children);
argv_array_clear(&options); argv_array_clear(&options);
} }

View File

@ -95,6 +95,7 @@ static int opt_force;
static char *opt_tags; static char *opt_tags;
static char *opt_prune; static char *opt_prune;
static char *opt_recurse_submodules; static char *opt_recurse_submodules;
static char *max_children;
static int opt_dry_run; static int opt_dry_run;
static char *opt_keep; static char *opt_keep;
static char *opt_depth; static char *opt_depth;
@ -178,6 +179,9 @@ static struct option pull_options[] = {
N_("on-demand"), N_("on-demand"),
N_("control recursive fetching of submodules"), N_("control recursive fetching of submodules"),
PARSE_OPT_OPTARG), PARSE_OPT_OPTARG),
OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
N_("number of submodules pulled in parallel"),
PARSE_OPT_OPTARG),
OPT_BOOL(0, "dry-run", &opt_dry_run, OPT_BOOL(0, "dry-run", &opt_dry_run,
N_("dry run")), N_("dry run")),
OPT_PASSTHRU('k', "keep", &opt_keep, NULL, OPT_PASSTHRU('k', "keep", &opt_keep, NULL,
@ -525,6 +529,8 @@ static int run_fetch(const char *repo, const char **refspecs)
argv_array_push(&args, opt_prune); argv_array_push(&args, opt_prune);
if (opt_recurse_submodules) if (opt_recurse_submodules)
argv_array_push(&args, opt_recurse_submodules); argv_array_push(&args, opt_recurse_submodules);
if (max_children)
argv_array_push(&args, max_children);
if (opt_dry_run) if (opt_dry_run)
argv_array_push(&args, "--dry-run"); argv_array_push(&args, "--dry-run");
if (opt_keep) if (opt_keep)

View File

@ -3,6 +3,8 @@
#include "exec_cmd.h" #include "exec_cmd.h"
#include "sigchain.h" #include "sigchain.h"
#include "argv-array.h" #include "argv-array.h"
#include "thread-utils.h"
#include "strbuf.h"
void child_process_init(struct child_process *child) void child_process_init(struct child_process *child)
{ {
@ -865,3 +867,336 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
close(cmd->out); close(cmd->out);
return finish_command(cmd); return finish_command(cmd);
} }
enum child_state {
GIT_CP_FREE,
GIT_CP_WORKING,
GIT_CP_WAIT_CLEANUP,
};
struct parallel_processes {
void *data;
int max_processes;
int nr_processes;
get_next_task_fn get_next_task;
start_failure_fn start_failure;
task_finished_fn task_finished;
struct {
enum child_state state;
struct child_process process;
struct strbuf err;
void *data;
} *children;
/*
* The struct pollfd is logically part of *children,
* but the system call expects it as its own array.
*/
struct pollfd *pfd;
unsigned shutdown : 1;
int output_owner;
struct strbuf buffered_output; /* of finished children */
};
static int default_start_failure(struct child_process *cp,
struct strbuf *err,
void *pp_cb,
void *pp_task_cb)
{
int i;
strbuf_addstr(err, "Starting a child failed:");
for (i = 0; cp->argv[i]; i++)
strbuf_addf(err, " %s", cp->argv[i]);
return 0;
}
static int default_task_finished(int result,
struct child_process *cp,
struct strbuf *err,
void *pp_cb,
void *pp_task_cb)
{
int i;
if (!result)
return 0;
strbuf_addf(err, "A child failed with return code %d:", result);
for (i = 0; cp->argv[i]; i++)
strbuf_addf(err, " %s", cp->argv[i]);
return 0;
}
static void kill_children(struct parallel_processes *pp, int signo)
{
int i, n = pp->max_processes;
for (i = 0; i < n; i++)
if (pp->children[i].state == GIT_CP_WORKING)
kill(pp->children[i].process.pid, signo);
}
static struct parallel_processes *pp_for_signal;
static void handle_children_on_signal(int signo)
{
kill_children(pp_for_signal, signo);
sigchain_pop(signo);
raise(signo);
}
static void pp_init(struct parallel_processes *pp,
int n,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
task_finished_fn task_finished,
void *data)
{
int i;
if (n < 1)
n = online_cpus();
pp->max_processes = n;
trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
pp->data = data;
if (!get_next_task)
die("BUG: you need to specify a get_next_task function");
pp->get_next_task = get_next_task;
pp->start_failure = start_failure ? start_failure : default_start_failure;
pp->task_finished = task_finished ? task_finished : default_task_finished;
pp->nr_processes = 0;
pp->output_owner = 0;
pp->shutdown = 0;
pp->children = xcalloc(n, sizeof(*pp->children));
pp->pfd = xcalloc(n, sizeof(*pp->pfd));
strbuf_init(&pp->buffered_output, 0);
for (i = 0; i < n; i++) {
strbuf_init(&pp->children[i].err, 0);
child_process_init(&pp->children[i].process);
pp->pfd[i].events = POLLIN | POLLHUP;
pp->pfd[i].fd = -1;
}
pp_for_signal = pp;
sigchain_push_common(handle_children_on_signal);
}
static void pp_cleanup(struct parallel_processes *pp)
{
int i;
trace_printf("run_processes_parallel: done");
for (i = 0; i < pp->max_processes; i++) {
strbuf_release(&pp->children[i].err);
child_process_clear(&pp->children[i].process);
}
free(pp->children);
free(pp->pfd);
/*
* When get_next_task added messages to the buffer in its last
* iteration, the buffered output is non empty.
*/
fputs(pp->buffered_output.buf, stderr);
strbuf_release(&pp->buffered_output);
sigchain_pop_common();
}
/* returns
* 0 if a new task was started.
* 1 if no new jobs was started (get_next_task ran out of work, non critical
* problem with starting a new command)
* <0 no new job was started, user wishes to shutdown early. Use negative code
* to signal the children.
*/
static int pp_start_one(struct parallel_processes *pp)
{
int i, code;
for (i = 0; i < pp->max_processes; i++)
if (pp->children[i].state == GIT_CP_FREE)
break;
if (i == pp->max_processes)
die("BUG: bookkeeping is hard");
code = pp->get_next_task(&pp->children[i].process,
&pp->children[i].err,
pp->data,
&pp->children[i].data);
if (!code) {
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
return 1;
}
pp->children[i].process.err = -1;
pp->children[i].process.stdout_to_stderr = 1;
pp->children[i].process.no_stdin = 1;
if (start_command(&pp->children[i].process)) {
code = pp->start_failure(&pp->children[i].process,
&pp->children[i].err,
pp->data,
&pp->children[i].data);
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
if (code)
pp->shutdown = 1;
return code;
}
pp->nr_processes++;
pp->children[i].state = GIT_CP_WORKING;
pp->pfd[i].fd = pp->children[i].process.err;
return 0;
}
static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
{
int i;
while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
if (errno == EINTR)
continue;
pp_cleanup(pp);
die_errno("poll");
}
/* Buffer output from all pipes. */
for (i = 0; i < pp->max_processes; i++) {
if (pp->children[i].state == GIT_CP_WORKING &&
pp->pfd[i].revents & (POLLIN | POLLHUP)) {
int n = strbuf_read_once(&pp->children[i].err,
pp->children[i].process.err, 0);
if (n == 0) {
close(pp->children[i].process.err);
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
} else if (n < 0)
if (errno != EAGAIN)
die_errno("read");
}
}
}
static void pp_output(struct parallel_processes *pp)
{
int i = pp->output_owner;
if (pp->children[i].state == GIT_CP_WORKING &&
pp->children[i].err.len) {
fputs(pp->children[i].err.buf, stderr);
strbuf_reset(&pp->children[i].err);
}
}
static int pp_collect_finished(struct parallel_processes *pp)
{
int i, code;
int n = pp->max_processes;
int result = 0;
while (pp->nr_processes > 0) {
for (i = 0; i < pp->max_processes; i++)
if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
break;
if (i == pp->max_processes)
break;
code = finish_command(&pp->children[i].process);
code = pp->task_finished(code, &pp->children[i].process,
&pp->children[i].err, pp->data,
&pp->children[i].data);
if (code)
result = code;
if (code < 0)
break;
pp->nr_processes--;
pp->children[i].state = GIT_CP_FREE;
pp->pfd[i].fd = -1;
child_process_init(&pp->children[i].process);
if (i != pp->output_owner) {
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
} else {
fputs(pp->children[i].err.buf, stderr);
strbuf_reset(&pp->children[i].err);
/* Output all other finished child processes */
fputs(pp->buffered_output.buf, stderr);
strbuf_reset(&pp->buffered_output);
/*
* Pick next process to output live.
* NEEDSWORK:
* For now we pick it randomly by doing a round
* robin. Later we may want to pick the one with
* the most output or the longest or shortest
* running process time.
*/
for (i = 0; i < n; i++)
if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
break;
pp->output_owner = (pp->output_owner + i) % n;
}
}
return result;
}
int run_processes_parallel(int n,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
task_finished_fn task_finished,
void *pp_cb)
{
int i, code;
int output_timeout = 100;
int spawn_cap = 4;
struct parallel_processes pp;
pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
while (1) {
for (i = 0;
i < spawn_cap && !pp.shutdown &&
pp.nr_processes < pp.max_processes;
i++) {
code = pp_start_one(&pp);
if (!code)
continue;
if (code < 0) {
pp.shutdown = 1;
kill_children(&pp, -code);
}
break;
}
if (!pp.nr_processes)
break;
pp_buffer_stderr(&pp, output_timeout);
pp_output(&pp);
code = pp_collect_finished(&pp);
if (code) {
pp.shutdown = 1;
if (code < 0)
kill_children(&pp, -code);
}
}
pp_cleanup(&pp);
return 0;
}

View File

@ -122,4 +122,84 @@ int start_async(struct async *async);
int finish_async(struct async *async); int finish_async(struct async *async);
int in_async(void); int in_async(void);
/**
* This callback should initialize the child process and preload the
* error channel if desired. The preloading of is useful if you want to
* have a message printed directly before the output of the child process.
* pp_cb is the callback cookie as passed to run_processes_parallel.
* You can store a child process specific callback cookie in pp_task_cb.
*
* Even after returning 0 to indicate that there are no more processes,
* this function will be called again until there are no more running
* child processes.
*
* Return 1 if the next child is ready to run.
* Return 0 if there are currently no more tasks to be processed.
* To send a signal to other child processes for abortion,
* return the negative signal number.
*/
typedef int (*get_next_task_fn)(struct child_process *cp,
struct strbuf *err,
void *pp_cb,
void **pp_task_cb);
/**
* This callback is called whenever there are problems starting
* a new process.
*
* You must not write to stdout or stderr in this function. Add your
* message to the strbuf err instead, which will be printed without
* messing up the output of the other parallel processes.
*
* pp_cb is the callback cookie as passed into run_processes_parallel,
* pp_task_cb is the callback cookie as passed into get_next_task_fn.
*
* Return 0 to continue the parallel processing. To abort return non zero.
* To send a signal to other child processes for abortion, return
* the negative signal number.
*/
typedef int (*start_failure_fn)(struct child_process *cp,
struct strbuf *err,
void *pp_cb,
void *pp_task_cb);
/**
* This callback is called on every child process that finished processing.
*
* You must not write to stdout or stderr in this function. Add your
* message to the strbuf err instead, which will be printed without
* messing up the output of the other parallel processes.
*
* pp_cb is the callback cookie as passed into run_processes_parallel,
* pp_task_cb is the callback cookie as passed into get_next_task_fn.
*
* Return 0 to continue the parallel processing. To abort return non zero.
* To send a signal to other child processes for abortion, return
* the negative signal number.
*/
typedef int (*task_finished_fn)(int result,
struct child_process *cp,
struct strbuf *err,
void *pp_cb,
void *pp_task_cb);
/**
* Runs up to n processes at the same time. Whenever a process can be
* started, the callback get_next_task_fn is called to obtain the data
* required to start another child process.
*
* The children started via this function run in parallel. Their output
* (both stdout and stderr) is routed to stderr in a manner that output
* from different tasks does not interleave.
*
* If start_failure_fn or task_finished_fn are NULL, default handlers
* will be used. The default handlers will print an error message on
* error without issuing an emergency stop.
*/
int run_processes_parallel(int n,
get_next_task_fn,
start_failure_fn,
task_finished_fn,
void *pp_cb);
#endif #endif

View File

@ -50,3 +50,12 @@ void sigchain_push_common(sigchain_fun f)
sigchain_push(SIGQUIT, f); sigchain_push(SIGQUIT, f);
sigchain_push(SIGPIPE, f); sigchain_push(SIGPIPE, f);
} }
void sigchain_pop_common(void)
{
sigchain_pop(SIGPIPE);
sigchain_pop(SIGQUIT);
sigchain_pop(SIGTERM);
sigchain_pop(SIGHUP);
sigchain_pop(SIGINT);
}

View File

@ -7,5 +7,6 @@ int sigchain_push(int sig, sigchain_fun f);
int sigchain_pop(int sig); int sigchain_pop(int sig);
void sigchain_push_common(sigchain_fun f); void sigchain_push_common(sigchain_fun f);
void sigchain_pop_common(void);
#endif /* SIGCHAIN_H */ #endif /* SIGCHAIN_H */

View File

@ -384,6 +384,17 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint)
return sb->len - oldlen; return sb->len - oldlen;
} }
ssize_t strbuf_read_once(struct strbuf *sb, int fd, size_t hint)
{
ssize_t cnt;
strbuf_grow(sb, hint ? hint : 8192);
cnt = xread(fd, sb->buf + sb->len, sb->alloc - sb->len - 1);
if (cnt > 0)
strbuf_setlen(sb, sb->len + cnt);
return cnt;
}
#define STRBUF_MAXLINK (2*PATH_MAX) #define STRBUF_MAXLINK (2*PATH_MAX)
int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint) int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint)

View File

@ -366,6 +366,14 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
*/ */
extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint); extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
/**
* Read the contents of a given file descriptor partially by using only one
* attempt of xread. The third argument can be used to give a hint about the
* file size, to avoid reallocs. Returns the number of new bytes appended to
* the sb.
*/
extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint);
/** /**
* Read the contents of a file, specified by its path. The third argument * Read the contents of a file, specified by its path. The third argument
* can be used to give a hint about the file size, to avoid reallocs. * can be used to give a hint about the file size, to avoid reallocs.

View File

@ -12,6 +12,7 @@
#include "sha1-array.h" #include "sha1-array.h"
#include "argv-array.h" #include "argv-array.h"
#include "blob.h" #include "blob.h"
#include "thread-utils.h"
static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND; static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND;
static struct string_list changed_submodule_paths; static struct string_list changed_submodule_paths;
@ -610,37 +611,28 @@ static void calculate_changed_submodule_paths(void)
initialized_fetch_ref_tips = 0; initialized_fetch_ref_tips = 0;
} }
int fetch_populated_submodules(const struct argv_array *options, struct submodule_parallel_fetch {
const char *prefix, int command_line_option, int count;
int quiet) struct argv_array args;
const char *work_tree;
const char *prefix;
int command_line_option;
int quiet;
int result;
};
#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
static int get_next_submodule(struct child_process *cp,
struct strbuf *err, void *data, void **task_cb)
{ {
int i, result = 0; int ret = 0;
struct child_process cp = CHILD_PROCESS_INIT; struct submodule_parallel_fetch *spf = data;
struct argv_array argv = ARGV_ARRAY_INIT;
const char *work_tree = get_git_work_tree();
if (!work_tree)
goto out;
if (read_cache() < 0) for (; spf->count < active_nr; spf->count++) {
die("index file corrupt");
argv_array_push(&argv, "fetch");
for (i = 0; i < options->argc; i++)
argv_array_push(&argv, options->argv[i]);
argv_array_push(&argv, "--recurse-submodules-default");
/* default value, "--submodule-prefix" and its value are added later */
cp.env = local_repo_env;
cp.git_cmd = 1;
cp.no_stdin = 1;
calculate_changed_submodule_paths();
for (i = 0; i < active_nr; i++) {
struct strbuf submodule_path = STRBUF_INIT; struct strbuf submodule_path = STRBUF_INIT;
struct strbuf submodule_git_dir = STRBUF_INIT; struct strbuf submodule_git_dir = STRBUF_INIT;
struct strbuf submodule_prefix = STRBUF_INIT; struct strbuf submodule_prefix = STRBUF_INIT;
const struct cache_entry *ce = active_cache[i]; const struct cache_entry *ce = active_cache[spf->count];
const char *git_dir, *default_argv; const char *git_dir, *default_argv;
const struct submodule *submodule; const struct submodule *submodule;
@ -652,7 +644,7 @@ int fetch_populated_submodules(const struct argv_array *options,
submodule = submodule_from_name(null_sha1, ce->name); submodule = submodule_from_name(null_sha1, ce->name);
default_argv = "yes"; default_argv = "yes";
if (command_line_option == RECURSE_SUBMODULES_DEFAULT) { if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) {
if (submodule && if (submodule &&
submodule->fetch_recurse != submodule->fetch_recurse !=
RECURSE_SUBMODULES_NONE) { RECURSE_SUBMODULES_NONE) {
@ -675,40 +667,101 @@ int fetch_populated_submodules(const struct argv_array *options,
default_argv = "on-demand"; default_argv = "on-demand";
} }
} }
} else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) { } else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name)) if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name))
continue; continue;
default_argv = "on-demand"; default_argv = "on-demand";
} }
strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name); strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name);
strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf); strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf);
strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name); strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name);
git_dir = read_gitfile(submodule_git_dir.buf); git_dir = read_gitfile(submodule_git_dir.buf);
if (!git_dir) if (!git_dir)
git_dir = submodule_git_dir.buf; git_dir = submodule_git_dir.buf;
if (is_directory(git_dir)) { if (is_directory(git_dir)) {
if (!quiet) child_process_init(cp);
printf("Fetching submodule %s%s\n", prefix, ce->name); cp->dir = strbuf_detach(&submodule_path, NULL);
cp.dir = submodule_path.buf; cp->env = local_repo_env;
argv_array_push(&argv, default_argv); cp->git_cmd = 1;
argv_array_push(&argv, "--submodule-prefix"); if (!spf->quiet)
argv_array_push(&argv, submodule_prefix.buf); strbuf_addf(err, "Fetching submodule %s%s\n",
cp.argv = argv.argv; spf->prefix, ce->name);
if (run_command(&cp)) argv_array_init(&cp->args);
result = 1; argv_array_pushv(&cp->args, spf->args.argv);
argv_array_pop(&argv); argv_array_push(&cp->args, default_argv);
argv_array_pop(&argv); argv_array_push(&cp->args, "--submodule-prefix");
argv_array_pop(&argv); argv_array_push(&cp->args, submodule_prefix.buf);
ret = 1;
} }
strbuf_release(&submodule_path); strbuf_release(&submodule_path);
strbuf_release(&submodule_git_dir); strbuf_release(&submodule_git_dir);
strbuf_release(&submodule_prefix); strbuf_release(&submodule_prefix);
if (ret) {
spf->count++;
return 1;
} }
argv_array_clear(&argv); }
return 0;
}
static int fetch_start_failure(struct child_process *cp,
struct strbuf *err,
void *cb, void *task_cb)
{
struct submodule_parallel_fetch *spf = cb;
spf->result = 1;
return 0;
}
static int fetch_finish(int retvalue, struct child_process *cp,
struct strbuf *err, void *cb, void *task_cb)
{
struct submodule_parallel_fetch *spf = cb;
if (retvalue)
spf->result = 1;
return 0;
}
int fetch_populated_submodules(const struct argv_array *options,
const char *prefix, int command_line_option,
int quiet, int max_parallel_jobs)
{
int i;
struct submodule_parallel_fetch spf = SPF_INIT;
spf.work_tree = get_git_work_tree();
spf.command_line_option = command_line_option;
spf.quiet = quiet;
spf.prefix = prefix;
if (!spf.work_tree)
goto out;
if (read_cache() < 0)
die("index file corrupt");
argv_array_push(&spf.args, "fetch");
for (i = 0; i < options->argc; i++)
argv_array_push(&spf.args, options->argv[i]);
argv_array_push(&spf.args, "--recurse-submodules-default");
/* default value, "--submodule-prefix" and its value are added later */
calculate_changed_submodule_paths();
run_processes_parallel(max_parallel_jobs,
get_next_submodule,
fetch_start_failure,
fetch_finish,
&spf);
argv_array_clear(&spf.args);
out: out:
string_list_clear(&changed_submodule_paths, 1); string_list_clear(&changed_submodule_paths, 1);
return result; return spf.result;
} }
unsigned is_submodule_modified(const char *path, int ignore_untracked) unsigned is_submodule_modified(const char *path, int ignore_untracked)

View File

@ -32,7 +32,7 @@ void set_config_fetch_recurse_submodules(int value);
void check_for_new_submodule_commits(unsigned char new_sha1[20]); void check_for_new_submodule_commits(unsigned char new_sha1[20]);
int fetch_populated_submodules(const struct argv_array *options, int fetch_populated_submodules(const struct argv_array *options,
const char *prefix, int command_line_option, const char *prefix, int command_line_option,
int quiet); int quiet, int max_parallel_jobs);
unsigned is_submodule_modified(const char *path, int ignore_untracked); unsigned is_submodule_modified(const char *path, int ignore_untracked);
int submodule_uses_gitfile(const char *path); int submodule_uses_gitfile(const char *path);
int ok_to_remove_submodule(const char *path); int ok_to_remove_submodule(const char *path);

View File

@ -47,4 +47,57 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
test_cmp expect actual test_cmp expect actual
' '
cat >expect <<-EOF
preloaded output of a child
Hello
World
preloaded output of a child
Hello
World
preloaded output of a child
Hello
World
preloaded output of a child
Hello
World
EOF
test_expect_success 'run_command runs in parallel with more jobs available than tasks' '
test-run-command run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
test_cmp expect actual
'
test_expect_success 'run_command runs in parallel with as many jobs as tasks' '
test-run-command run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
test_cmp expect actual
'
test_expect_success 'run_command runs in parallel with more tasks than jobs available' '
test-run-command run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
test_cmp expect actual
'
cat >expect <<-EOF
preloaded output of a child
asking for a quick stop
preloaded output of a child
asking for a quick stop
preloaded output of a child
asking for a quick stop
EOF
test_expect_success 'run_command is asked to abort gracefully' '
test-run-command run-command-abort 3 false 2>actual &&
test_cmp expect actual
'
cat >expect <<-EOF
no further jobs available
EOF
test_expect_success 'run_command outputs ' '
test-run-command run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
test_cmp expect actual
'
test_done test_done

View File

@ -16,7 +16,8 @@ add_upstream_commit() {
git add subfile && git add subfile &&
git commit -m new subfile && git commit -m new subfile &&
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
echo "From $pwd/submodule" > ../expect.err && echo "Fetching submodule submodule" > ../expect.err &&
echo "From $pwd/submodule" >> ../expect.err &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err echo " $head1..$head2 master -> origin/master" >> ../expect.err
) && ) &&
( (
@ -27,6 +28,7 @@ add_upstream_commit() {
git add deepsubfile && git add deepsubfile &&
git commit -m new deepsubfile && git commit -m new deepsubfile &&
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
echo "Fetching submodule submodule/subdir/deepsubmodule" >> ../expect.err
echo "From $pwd/deepsubmodule" >> ../expect.err && echo "From $pwd/deepsubmodule" >> ../expect.err &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err echo " $head1..$head2 master -> origin/master" >> ../expect.err
) )
@ -56,9 +58,7 @@ test_expect_success setup '
( (
cd downstream && cd downstream &&
git submodule update --init --recursive git submodule update --init --recursive
) && )
echo "Fetching submodule submodule" > expect.out &&
echo "Fetching submodule submodule/subdir/deepsubmodule" >> expect.out
' '
test_expect_success "fetch --recurse-submodules recurses into submodules" ' test_expect_success "fetch --recurse-submodules recurses into submodules" '
@ -67,10 +67,21 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
cd downstream && cd downstream &&
git fetch --recurse-submodules >../actual.out 2>../actual.err git fetch --recurse-submodules >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" '
add_upstream_commit &&
(
cd downstream &&
GIT_TRACE=$(pwd)/../trace.out git fetch --recurse-submodules -j2 2>../actual.err
) &&
test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err &&
grep "2 tasks" trace.out
'
test_expect_success "fetch alone only fetches superproject" ' test_expect_success "fetch alone only fetches superproject" '
add_upstream_commit && add_upstream_commit &&
( (
@ -96,7 +107,7 @@ test_expect_success "using fetchRecurseSubmodules=true in .gitmodules recurses i
git config -f .gitmodules submodule.submodule.fetchRecurseSubmodules true && git config -f .gitmodules submodule.submodule.fetchRecurseSubmodules true &&
git fetch >../actual.out 2>../actual.err git fetch >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
@ -127,7 +138,7 @@ test_expect_success "--recurse-submodules overrides fetchRecurseSubmodules setti
git config --unset -f .gitmodules submodule.submodule.fetchRecurseSubmodules && git config --unset -f .gitmodules submodule.submodule.fetchRecurseSubmodules &&
git config --unset submodule.submodule.fetchRecurseSubmodules git config --unset submodule.submodule.fetchRecurseSubmodules
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
@ -140,13 +151,22 @@ test_expect_success "--quiet propagates to submodules" '
! test -s actual.err ! test -s actual.err
' '
test_expect_success "--quiet propagates to parallel submodules" '
(
cd downstream &&
git fetch --recurse-submodules -j 2 --quiet >../actual.out 2>../actual.err
) &&
! test -s actual.out &&
! test -s actual.err
'
test_expect_success "--dry-run propagates to submodules" ' test_expect_success "--dry-run propagates to submodules" '
add_upstream_commit && add_upstream_commit &&
( (
cd downstream && cd downstream &&
git fetch --recurse-submodules --dry-run >../actual.out 2>../actual.err git fetch --recurse-submodules --dry-run >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
@ -155,7 +175,7 @@ test_expect_success "Without --dry-run propagates to submodules" '
cd downstream && cd downstream &&
git fetch --recurse-submodules >../actual.out 2>../actual.err git fetch --recurse-submodules >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
@ -166,7 +186,7 @@ test_expect_success "recurseSubmodules=true propagates into submodules" '
git config fetch.recurseSubmodules true git config fetch.recurseSubmodules true
git fetch >../actual.out 2>../actual.err git fetch >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
@ -180,7 +200,7 @@ test_expect_success "--recurse-submodules overrides config in submodule" '
) && ) &&
git fetch --recurse-submodules >../actual.out 2>../actual.err git fetch --recurse-submodules >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
@ -214,16 +234,15 @@ test_expect_success "Recursion stops when no new submodule commits are fetched"
git add submodule && git add submodule &&
git commit -m "new submodule" && git commit -m "new submodule" &&
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
echo "Fetching submodule submodule" > expect.out.sub &&
echo "From $pwd/." > expect.err.sub && echo "From $pwd/." > expect.err.sub &&
echo " $head1..$head2 master -> origin/master" >>expect.err.sub && echo " $head1..$head2 master -> origin/master" >>expect.err.sub &&
head -2 expect.err >> expect.err.sub && head -3 expect.err >> expect.err.sub &&
( (
cd downstream && cd downstream &&
git fetch >../actual.out 2>../actual.err git fetch >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.err.sub actual.err && test_i18ncmp expect.err.sub actual.err &&
test_i18ncmp expect.out.sub actual.out test_must_be_empty actual.out
' '
test_expect_success "Recursion doesn't happen when new superproject commits don't change any submodules" ' test_expect_success "Recursion doesn't happen when new superproject commits don't change any submodules" '
@ -269,7 +288,7 @@ test_expect_success "Recursion picks up config in submodule" '
) )
) && ) &&
test_i18ncmp expect.err.sub actual.err && test_i18ncmp expect.err.sub actual.err &&
test_i18ncmp expect.out actual.out test_must_be_empty actual.out
' '
test_expect_success "Recursion picks up all submodules when necessary" ' test_expect_success "Recursion picks up all submodules when necessary" '
@ -285,7 +304,8 @@ test_expect_success "Recursion picks up all submodules when necessary" '
git add subdir/deepsubmodule && git add subdir/deepsubmodule &&
git commit -m "new deepsubmodule" git commit -m "new deepsubmodule"
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
echo "From $pwd/submodule" > ../expect.err.sub && echo "Fetching submodule submodule" > ../expect.err.sub &&
echo "From $pwd/submodule" >> ../expect.err.sub &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub
) && ) &&
head1=$(git rev-parse --short HEAD) && head1=$(git rev-parse --short HEAD) &&
@ -295,13 +315,13 @@ test_expect_success "Recursion picks up all submodules when necessary" '
echo "From $pwd/." > expect.err.2 && echo "From $pwd/." > expect.err.2 &&
echo " $head1..$head2 master -> origin/master" >> expect.err.2 && echo " $head1..$head2 master -> origin/master" >> expect.err.2 &&
cat expect.err.sub >> expect.err.2 && cat expect.err.sub >> expect.err.2 &&
tail -2 expect.err >> expect.err.2 && tail -3 expect.err >> expect.err.2 &&
( (
cd downstream && cd downstream &&
git fetch >../actual.out 2>../actual.err git fetch >../actual.out 2>../actual.err
) && ) &&
test_i18ncmp expect.err.2 actual.err && test_i18ncmp expect.err.2 actual.err &&
test_i18ncmp expect.out actual.out test_must_be_empty actual.out
' '
test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no new commits are fetched in the superproject (and ignores config)" ' test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no new commits are fetched in the superproject (and ignores config)" '
@ -317,7 +337,8 @@ test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no ne
git add subdir/deepsubmodule && git add subdir/deepsubmodule &&
git commit -m "new deepsubmodule" && git commit -m "new deepsubmodule" &&
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
echo "From $pwd/submodule" > ../expect.err.sub && echo Fetching submodule submodule > ../expect.err.sub &&
echo "From $pwd/submodule" >> ../expect.err.sub &&
echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub
) && ) &&
( (
@ -335,7 +356,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
git add submodule && git add submodule &&
git commit -m "new submodule" && git commit -m "new submodule" &&
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
tail -2 expect.err > expect.err.deepsub && tail -3 expect.err > expect.err.deepsub &&
echo "From $pwd/." > expect.err && echo "From $pwd/." > expect.err &&
echo " $head1..$head2 master -> origin/master" >>expect.err && echo " $head1..$head2 master -> origin/master" >>expect.err &&
cat expect.err.sub >> expect.err && cat expect.err.sub >> expect.err &&
@ -354,7 +375,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
git config --unset -f .gitmodules submodule.subdir/deepsubmodule.fetchRecursive git config --unset -f .gitmodules submodule.subdir/deepsubmodule.fetchRecursive
) )
) && ) &&
test_i18ncmp expect.out actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err actual.err test_i18ncmp expect.err actual.err
' '
@ -388,7 +409,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
echo "From $pwd/." > expect.err.2 && echo "From $pwd/." > expect.err.2 &&
echo " $head1..$head2 master -> origin/master" >>expect.err.2 && echo " $head1..$head2 master -> origin/master" >>expect.err.2 &&
head -2 expect.err >> expect.err.2 && head -3 expect.err >> expect.err.2 &&
( (
cd downstream && cd downstream &&
git config fetch.recurseSubmodules on-demand && git config fetch.recurseSubmodules on-demand &&
@ -399,7 +420,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
cd downstream && cd downstream &&
git config --unset fetch.recurseSubmodules git config --unset fetch.recurseSubmodules
) && ) &&
test_i18ncmp expect.out.sub actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err.2 actual.err test_i18ncmp expect.err.2 actual.err
' '
@ -416,7 +437,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
head2=$(git rev-parse --short HEAD) && head2=$(git rev-parse --short HEAD) &&
echo "From $pwd/." > expect.err.2 && echo "From $pwd/." > expect.err.2 &&
echo " $head1..$head2 master -> origin/master" >>expect.err.2 && echo " $head1..$head2 master -> origin/master" >>expect.err.2 &&
head -2 expect.err >> expect.err.2 && head -3 expect.err >> expect.err.2 &&
( (
cd downstream && cd downstream &&
git config submodule.submodule.fetchRecurseSubmodules on-demand && git config submodule.submodule.fetchRecurseSubmodules on-demand &&
@ -427,7 +448,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
cd downstream && cd downstream &&
git config --unset submodule.submodule.fetchRecurseSubmodules git config --unset submodule.submodule.fetchRecurseSubmodules
) && ) &&
test_i18ncmp expect.out.sub actual.out && test_must_be_empty actual.out &&
test_i18ncmp expect.err.2 actual.err test_i18ncmp expect.err.2 actual.err
' '

View File

@ -10,12 +10,50 @@
#include "git-compat-util.h" #include "git-compat-util.h"
#include "run-command.h" #include "run-command.h"
#include "argv-array.h"
#include "strbuf.h"
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
static int number_callbacks;
static int parallel_next(struct child_process *cp,
struct strbuf *err,
void *cb,
void **task_cb)
{
struct child_process *d = cb;
if (number_callbacks >= 4)
return 0;
argv_array_pushv(&cp->args, d->argv);
strbuf_addf(err, "preloaded output of a child\n");
number_callbacks++;
return 1;
}
static int no_job(struct child_process *cp,
struct strbuf *err,
void *cb,
void **task_cb)
{
strbuf_addf(err, "no further jobs available\n");
return 0;
}
static int task_finished(int result,
struct child_process *cp,
struct strbuf *err,
void *pp_cb,
void *pp_task_cb)
{
strbuf_addf(err, "asking for a quick stop\n");
return 1;
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
struct child_process proc = CHILD_PROCESS_INIT; struct child_process proc = CHILD_PROCESS_INIT;
int jobs;
if (argc < 3) if (argc < 3)
return 1; return 1;
@ -30,6 +68,21 @@ int main(int argc, char **argv)
if (!strcmp(argv[1], "run-command")) if (!strcmp(argv[1], "run-command"))
exit(run_command(&proc)); exit(run_command(&proc));
jobs = atoi(argv[2]);
proc.argv = (const char **)argv + 3;
if (!strcmp(argv[1], "run-command-parallel"))
exit(run_processes_parallel(jobs, parallel_next,
NULL, NULL, &proc));
if (!strcmp(argv[1], "run-command-abort"))
exit(run_processes_parallel(jobs, parallel_next,
NULL, task_finished, &proc));
if (!strcmp(argv[1], "run-command-no-jobs"))
exit(run_processes_parallel(jobs, no_job,
NULL, task_finished, &proc));
fprintf(stderr, "check usage\n"); fprintf(stderr, "check usage\n");
return 1; return 1;
} }

View File

@ -236,8 +236,24 @@ ssize_t xread(int fd, void *buf, size_t len)
len = MAX_IO_SIZE; len = MAX_IO_SIZE;
while (1) { while (1) {
nr = read(fd, buf, len); nr = read(fd, buf, len);
if ((nr < 0) && (errno == EAGAIN || errno == EINTR)) if (nr < 0) {
if (errno == EINTR)
continue; continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
struct pollfd pfd;
pfd.events = POLLIN;
pfd.fd = fd;
/*
* it is OK if this poll() failed; we
* want to leave this infinite loop
* only when read() returns with
* success, or an expected failure,
* which would be checked by the next
* call to read(2).
*/
poll(&pfd, 1, -1);
}
}
return nr; return nr;
} }
} }