run-command: support custom fd-set in async
This patch adds the possibility to supply a set of non-0 file descriptors for async process communication instead of the default-created pipe. Additionally, we now support bi-directional communiction with the async procedure, by giving the async function both read and write file descriptors. To retain compatiblity and similar "API feel" with start_command, we require start_async callers to set .out = -1 to get a readable file descriptor. If either of .in or .out is 0, we supply no file descriptor to the async process. [sp: Note: Erik started this patch, and a huge bulk of it is his work. All bugs were introduced later by Shawn.] Signed-off-by: Erik Faye-Lund <kusmabite@gmail.com> Signed-off-by: Shawn O. Pearce <spearce@spearce.org> Signed-off-by: Junio C Hamano <gitster@pobox.com>
This commit is contained in:
parent
4f41b61148
commit
ae6a5609c0
@ -64,8 +64,8 @@ The functions above do the following:
|
|||||||
`start_async`::
|
`start_async`::
|
||||||
|
|
||||||
Run a function asynchronously. Takes a pointer to a `struct
|
Run a function asynchronously. Takes a pointer to a `struct
|
||||||
async` that specifies the details and returns a pipe FD
|
async` that specifies the details and returns a set of pipe FDs
|
||||||
from which the caller reads. See below for details.
|
for communication with the function. See below for details.
|
||||||
|
|
||||||
`finish_async`::
|
`finish_async`::
|
||||||
|
|
||||||
@ -180,17 +180,47 @@ The caller:
|
|||||||
struct async variable;
|
struct async variable;
|
||||||
2. initializes .proc and .data;
|
2. initializes .proc and .data;
|
||||||
3. calls start_async();
|
3. calls start_async();
|
||||||
4. processes the data by reading from the fd in .out;
|
4. processes communicates with proc through .in and .out;
|
||||||
5. closes .out;
|
5. closes .in and .out;
|
||||||
6. calls finish_async().
|
6. calls finish_async().
|
||||||
|
|
||||||
|
The members .in, .out are used to provide a set of fd's for
|
||||||
|
communication between the caller and the callee as follows:
|
||||||
|
|
||||||
|
. Specify 0 to have no file descriptor passed. The callee will
|
||||||
|
receive -1 in the corresponding argument.
|
||||||
|
|
||||||
|
. Specify < 0 to have a pipe allocated; start_async() replaces
|
||||||
|
with the pipe FD in the following way:
|
||||||
|
|
||||||
|
.in: Returns the writable pipe end into which the caller
|
||||||
|
writes; the readable end of the pipe becomes the function's
|
||||||
|
in argument.
|
||||||
|
|
||||||
|
.out: Returns the readable pipe end from which the caller
|
||||||
|
reads; the writable end of the pipe becomes the function's
|
||||||
|
out argument.
|
||||||
|
|
||||||
|
The caller of start_async() must close the returned FDs after it
|
||||||
|
has completed reading from/writing from them.
|
||||||
|
|
||||||
|
. Specify a file descriptor > 0 to be used by the function:
|
||||||
|
|
||||||
|
.in: The FD must be readable; it becomes the function's in.
|
||||||
|
.out: The FD must be writable; it becomes the function's out.
|
||||||
|
|
||||||
|
The specified FD is closed by start_async(), even if it fails to
|
||||||
|
run the function.
|
||||||
|
|
||||||
The function pointer in .proc has the following signature:
|
The function pointer in .proc has the following signature:
|
||||||
|
|
||||||
int proc(int fd, void *data);
|
int proc(int in, int out, void *data);
|
||||||
|
|
||||||
. fd specifies a writable file descriptor to which the function must
|
. in, out specifies a set of file descriptors to which the function
|
||||||
write the data that it produces. The function *must* close this
|
must read/write the data that it needs/produces. The function
|
||||||
descriptor before it returns.
|
*must* close these descriptors before it returns. A descriptor
|
||||||
|
may be -1 if the caller did not configure a descriptor for that
|
||||||
|
direction.
|
||||||
|
|
||||||
. data is the value that the caller has specified in the .data member
|
. data is the value that the caller has specified in the .data member
|
||||||
of struct async.
|
of struct async.
|
||||||
@ -205,8 +235,8 @@ because this facility is implemented by a pipe to a forked process on
|
|||||||
UNIX, but by a thread in the same address space on Windows:
|
UNIX, but by a thread in the same address space on Windows:
|
||||||
|
|
||||||
. It cannot change the program's state (global variables, environment,
|
. It cannot change the program's state (global variables, environment,
|
||||||
etc.) in a way that the caller notices; in other words, .out is the
|
etc.) in a way that the caller notices; in other words, .in and .out
|
||||||
only communication channel to the caller.
|
are the only communication channels to the caller.
|
||||||
|
|
||||||
. It must not change the program's state that the caller of the
|
. It must not change the program's state that the caller of the
|
||||||
facility also uses.
|
facility also uses.
|
||||||
|
@ -586,12 +586,12 @@ static int everything_local(struct ref **refs, int nr_match, char **match)
|
|||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int sideband_demux(int fd, void *data)
|
static int sideband_demux(int in, int out, void *data)
|
||||||
{
|
{
|
||||||
int *xd = data;
|
int *xd = data;
|
||||||
|
|
||||||
int ret = recv_sideband("fetch-pack", xd[0], fd);
|
int ret = recv_sideband("fetch-pack", xd[0], out);
|
||||||
close(fd);
|
close(out);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -613,6 +613,7 @@ static int get_pack(int xd[2], char **pack_lockfile)
|
|||||||
*/
|
*/
|
||||||
demux.proc = sideband_demux;
|
demux.proc = sideband_demux;
|
||||||
demux.data = xd;
|
demux.data = xd;
|
||||||
|
demux.out = -1;
|
||||||
if (start_async(&demux))
|
if (start_async(&demux))
|
||||||
die("fetch-pack: unable to fork off sideband"
|
die("fetch-pack: unable to fork off sideband"
|
||||||
" demultiplexer");
|
" demultiplexer");
|
||||||
|
@ -241,7 +241,7 @@ struct filter_params {
|
|||||||
const char *cmd;
|
const char *cmd;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int filter_buffer(int fd, void *data)
|
static int filter_buffer(int in, int out, void *data)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Spawn cmd and feed the buffer contents through its stdin.
|
* Spawn cmd and feed the buffer contents through its stdin.
|
||||||
@ -254,7 +254,7 @@ static int filter_buffer(int fd, void *data)
|
|||||||
memset(&child_process, 0, sizeof(child_process));
|
memset(&child_process, 0, sizeof(child_process));
|
||||||
child_process.argv = argv;
|
child_process.argv = argv;
|
||||||
child_process.in = -1;
|
child_process.in = -1;
|
||||||
child_process.out = fd;
|
child_process.out = out;
|
||||||
|
|
||||||
if (start_command(&child_process))
|
if (start_command(&child_process))
|
||||||
return error("cannot fork to run external filter %s", params->cmd);
|
return error("cannot fork to run external filter %s", params->cmd);
|
||||||
@ -291,6 +291,7 @@ static int apply_filter(const char *path, const char *src, size_t len,
|
|||||||
memset(&async, 0, sizeof(async));
|
memset(&async, 0, sizeof(async));
|
||||||
async.proc = filter_buffer;
|
async.proc = filter_buffer;
|
||||||
async.data = ¶ms;
|
async.data = ¶ms;
|
||||||
|
async.out = -1;
|
||||||
params.src = src;
|
params.src = src;
|
||||||
params.size = len;
|
params.size = len;
|
||||||
params.cmd = cmd;
|
params.cmd = cmd;
|
||||||
|
@ -184,13 +184,13 @@ static struct discovery* discover_refs(const char *service)
|
|||||||
return last;
|
return last;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int write_discovery(int fd, void *data)
|
static int write_discovery(int in, int out, void *data)
|
||||||
{
|
{
|
||||||
struct discovery *heads = data;
|
struct discovery *heads = data;
|
||||||
int err = 0;
|
int err = 0;
|
||||||
if (write_in_full(fd, heads->buf, heads->len) != heads->len)
|
if (write_in_full(out, heads->buf, heads->len) != heads->len)
|
||||||
err = 1;
|
err = 1;
|
||||||
close(fd);
|
close(out);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,6 +202,7 @@ static struct ref *parse_git_refs(struct discovery *heads)
|
|||||||
memset(&async, 0, sizeof(async));
|
memset(&async, 0, sizeof(async));
|
||||||
async.proc = write_discovery;
|
async.proc = write_discovery;
|
||||||
async.data = heads;
|
async.data = heads;
|
||||||
|
async.out = -1;
|
||||||
|
|
||||||
if (start_async(&async))
|
if (start_async(&async))
|
||||||
die("cannot start thread to parse advertised refs");
|
die("cannot start thread to parse advertised refs");
|
||||||
|
@ -327,17 +327,51 @@ int run_command_v_opt_cd_env(const char **argv, int opt, const char *dir, const
|
|||||||
static unsigned __stdcall run_thread(void *data)
|
static unsigned __stdcall run_thread(void *data)
|
||||||
{
|
{
|
||||||
struct async *async = data;
|
struct async *async = data;
|
||||||
return async->proc(async->fd_for_proc, async->data);
|
return async->proc(async->proc_in, async->proc_out, async->data);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int start_async(struct async *async)
|
int start_async(struct async *async)
|
||||||
{
|
{
|
||||||
int pipe_out[2];
|
int need_in, need_out;
|
||||||
|
int fdin[2], fdout[2];
|
||||||
|
int proc_in, proc_out;
|
||||||
|
|
||||||
if (pipe(pipe_out) < 0)
|
need_in = async->in < 0;
|
||||||
|
if (need_in) {
|
||||||
|
if (pipe(fdin) < 0) {
|
||||||
|
if (async->out > 0)
|
||||||
|
close(async->out);
|
||||||
return error("cannot create pipe: %s", strerror(errno));
|
return error("cannot create pipe: %s", strerror(errno));
|
||||||
async->out = pipe_out[0];
|
}
|
||||||
|
async->in = fdin[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
need_out = async->out < 0;
|
||||||
|
if (need_out) {
|
||||||
|
if (pipe(fdout) < 0) {
|
||||||
|
if (need_in)
|
||||||
|
close_pair(fdin);
|
||||||
|
else if (async->in)
|
||||||
|
close(async->in);
|
||||||
|
return error("cannot create pipe: %s", strerror(errno));
|
||||||
|
}
|
||||||
|
async->out = fdout[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (need_in)
|
||||||
|
proc_in = fdin[0];
|
||||||
|
else if (async->in)
|
||||||
|
proc_in = async->in;
|
||||||
|
else
|
||||||
|
proc_in = -1;
|
||||||
|
|
||||||
|
if (need_out)
|
||||||
|
proc_out = fdout[1];
|
||||||
|
else if (async->out)
|
||||||
|
proc_out = async->out;
|
||||||
|
else
|
||||||
|
proc_out = -1;
|
||||||
|
|
||||||
#ifndef WIN32
|
#ifndef WIN32
|
||||||
/* Flush stdio before fork() to avoid cloning buffers */
|
/* Flush stdio before fork() to avoid cloning buffers */
|
||||||
@ -346,24 +380,47 @@ int start_async(struct async *async)
|
|||||||
async->pid = fork();
|
async->pid = fork();
|
||||||
if (async->pid < 0) {
|
if (async->pid < 0) {
|
||||||
error("fork (async) failed: %s", strerror(errno));
|
error("fork (async) failed: %s", strerror(errno));
|
||||||
close_pair(pipe_out);
|
goto error;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
if (!async->pid) {
|
if (!async->pid) {
|
||||||
close(pipe_out[0]);
|
if (need_in)
|
||||||
exit(!!async->proc(pipe_out[1], async->data));
|
close(fdin[1]);
|
||||||
|
if (need_out)
|
||||||
|
close(fdout[0]);
|
||||||
|
exit(!!async->proc(proc_in, proc_out, async->data));
|
||||||
}
|
}
|
||||||
close(pipe_out[1]);
|
|
||||||
|
if (need_in)
|
||||||
|
close(fdin[0]);
|
||||||
|
else if (async->in)
|
||||||
|
close(async->in);
|
||||||
|
|
||||||
|
if (need_out)
|
||||||
|
close(fdout[1]);
|
||||||
|
else if (async->out)
|
||||||
|
close(async->out);
|
||||||
#else
|
#else
|
||||||
async->fd_for_proc = pipe_out[1];
|
async->proc_in = proc_in;
|
||||||
|
async->proc_out = proc_out;
|
||||||
async->tid = (HANDLE) _beginthreadex(NULL, 0, run_thread, async, 0, NULL);
|
async->tid = (HANDLE) _beginthreadex(NULL, 0, run_thread, async, 0, NULL);
|
||||||
if (!async->tid) {
|
if (!async->tid) {
|
||||||
error("cannot create thread: %s", strerror(errno));
|
error("cannot create thread: %s", strerror(errno));
|
||||||
close_pair(pipe_out);
|
goto error;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
error:
|
||||||
|
if (need_in)
|
||||||
|
close_pair(fdin);
|
||||||
|
else if (async->in)
|
||||||
|
close(async->in);
|
||||||
|
|
||||||
|
if (need_out)
|
||||||
|
close_pair(fdout);
|
||||||
|
else if (async->out)
|
||||||
|
close(async->out);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int finish_async(struct async *async)
|
int finish_async(struct async *async)
|
||||||
|
@ -64,17 +64,20 @@ int run_command_v_opt_cd_env(const char **argv, int opt, const char *dir, const
|
|||||||
*/
|
*/
|
||||||
struct async {
|
struct async {
|
||||||
/*
|
/*
|
||||||
* proc writes to fd and closes it;
|
* proc reads from in; closes it before return
|
||||||
|
* proc writes to out; closes it before return
|
||||||
* returns 0 on success, non-zero on failure
|
* returns 0 on success, non-zero on failure
|
||||||
*/
|
*/
|
||||||
int (*proc)(int fd, void *data);
|
int (*proc)(int in, int out, void *data);
|
||||||
void *data;
|
void *data;
|
||||||
|
int in; /* caller writes here and closes it */
|
||||||
int out; /* caller reads from here and closes it */
|
int out; /* caller reads from here and closes it */
|
||||||
#ifndef WIN32
|
#ifndef WIN32
|
||||||
pid_t pid;
|
pid_t pid;
|
||||||
#else
|
#else
|
||||||
HANDLE tid;
|
HANDLE tid;
|
||||||
int fd_for_proc;
|
int proc_in;
|
||||||
|
int proc_out;
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -105,12 +105,12 @@ static void show_edge(struct commit *commit)
|
|||||||
fprintf(pack_pipe, "-%s\n", sha1_to_hex(commit->object.sha1));
|
fprintf(pack_pipe, "-%s\n", sha1_to_hex(commit->object.sha1));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int do_rev_list(int fd, void *create_full_pack)
|
static int do_rev_list(int in, int out, void *create_full_pack)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
struct rev_info revs;
|
struct rev_info revs;
|
||||||
|
|
||||||
pack_pipe = xfdopen(fd, "w");
|
pack_pipe = xfdopen(out, "w");
|
||||||
init_revisions(&revs, NULL);
|
init_revisions(&revs, NULL);
|
||||||
revs.tag_objects = 1;
|
revs.tag_objects = 1;
|
||||||
revs.tree_objects = 1;
|
revs.tree_objects = 1;
|
||||||
@ -162,8 +162,9 @@ static void create_pack_file(void)
|
|||||||
int arg = 0;
|
int arg = 0;
|
||||||
|
|
||||||
if (shallow_nr) {
|
if (shallow_nr) {
|
||||||
|
memset(&rev_list, 0, sizeof(rev_list));
|
||||||
rev_list.proc = do_rev_list;
|
rev_list.proc = do_rev_list;
|
||||||
rev_list.data = 0;
|
rev_list.out = -1;
|
||||||
if (start_async(&rev_list))
|
if (start_async(&rev_list))
|
||||||
die("git upload-pack: unable to fork git-rev-list");
|
die("git upload-pack: unable to fork git-rev-list");
|
||||||
argv[arg++] = "pack-objects";
|
argv[arg++] = "pack-objects";
|
||||||
|
Loading…
Reference in New Issue
Block a user