6aac70a870
Simple IPC always requires threads (in addition to various platform-specific IPC support). Fix the ifdefs in the Makefile to define SUPPORTS_SIMPLE_IPC when appropriate. Previously, the Unix version of the code would only verify that Unix domain sockets were available. This problem was reported here: https://lore.kernel.org/git/YKN5lXs4AoK%2FJFTO@coredump.intra.peff.net/T/#m08be8f1942ea8a2c36cfee0e51cdf06489fdeafc Reported-by: Randall S. Becker <rsbecker@nexbridge.com> Helped-by: Jeff King <peff@peff.net> Signed-off-by: Jeff Hostetler <jeffhost@microsoft.com> Signed-off-by: Junio C Hamano <gitster@pobox.com>
1004 lines
25 KiB
C
1004 lines
25 KiB
C
#include "cache.h"
|
|
#include "simple-ipc.h"
|
|
#include "strbuf.h"
|
|
#include "pkt-line.h"
|
|
#include "thread-utils.h"
|
|
#include "unix-socket.h"
|
|
#include "unix-stream-server.h"
|
|
|
|
#ifndef SUPPORTS_SIMPLE_IPC
|
|
/*
|
|
* This source file should only be compiled when Simple IPC is supported.
|
|
* See the top-level Makefile.
|
|
*/
|
|
#error SUPPORTS_SIMPLE_IPC not defined
|
|
#endif
|
|
|
|
enum ipc_active_state ipc_get_active_state(const char *path)
|
|
{
|
|
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
|
|
struct ipc_client_connect_options options
|
|
= IPC_CLIENT_CONNECT_OPTIONS_INIT;
|
|
struct stat st;
|
|
struct ipc_client_connection *connection_test = NULL;
|
|
|
|
options.wait_if_busy = 0;
|
|
options.wait_if_not_found = 0;
|
|
|
|
if (lstat(path, &st) == -1) {
|
|
switch (errno) {
|
|
case ENOENT:
|
|
case ENOTDIR:
|
|
return IPC_STATE__NOT_LISTENING;
|
|
default:
|
|
return IPC_STATE__INVALID_PATH;
|
|
}
|
|
}
|
|
|
|
/* also complain if a plain file is in the way */
|
|
if ((st.st_mode & S_IFMT) != S_IFSOCK)
|
|
return IPC_STATE__INVALID_PATH;
|
|
|
|
/*
|
|
* Just because the filesystem has a S_IFSOCK type inode
|
|
* at `path`, doesn't mean it that there is a server listening.
|
|
* Ping it to be sure.
|
|
*/
|
|
state = ipc_client_try_connect(path, &options, &connection_test);
|
|
ipc_client_close_connection(connection_test);
|
|
|
|
return state;
|
|
}
|
|
|
|
/*
|
|
* Retry frequency when trying to connect to a server.
|
|
*
|
|
* This value should be short enough that we don't seriously delay our
|
|
* caller, but not fast enough that our spinning puts pressure on the
|
|
* system.
|
|
*/
|
|
#define WAIT_STEP_MS (50)
|
|
|
|
/*
|
|
* Try to connect to the server. If the server is just starting up or
|
|
* is very busy, we may not get a connection the first time.
|
|
*/
|
|
static enum ipc_active_state connect_to_server(
|
|
const char *path,
|
|
int timeout_ms,
|
|
const struct ipc_client_connect_options *options,
|
|
int *pfd)
|
|
{
|
|
int k;
|
|
|
|
*pfd = -1;
|
|
|
|
for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
|
|
int fd = unix_stream_connect(path, options->uds_disallow_chdir);
|
|
|
|
if (fd != -1) {
|
|
*pfd = fd;
|
|
return IPC_STATE__LISTENING;
|
|
}
|
|
|
|
if (errno == ENOENT) {
|
|
if (!options->wait_if_not_found)
|
|
return IPC_STATE__PATH_NOT_FOUND;
|
|
|
|
goto sleep_and_try_again;
|
|
}
|
|
|
|
if (errno == ETIMEDOUT) {
|
|
if (!options->wait_if_busy)
|
|
return IPC_STATE__NOT_LISTENING;
|
|
|
|
goto sleep_and_try_again;
|
|
}
|
|
|
|
if (errno == ECONNREFUSED) {
|
|
if (!options->wait_if_busy)
|
|
return IPC_STATE__NOT_LISTENING;
|
|
|
|
goto sleep_and_try_again;
|
|
}
|
|
|
|
return IPC_STATE__OTHER_ERROR;
|
|
|
|
sleep_and_try_again:
|
|
sleep_millisec(WAIT_STEP_MS);
|
|
}
|
|
|
|
return IPC_STATE__NOT_LISTENING;
|
|
}
|
|
|
|
/*
|
|
* The total amount of time that we are willing to wait when trying to
|
|
* connect to a server.
|
|
*
|
|
* When the server is first started, it might take a little while for
|
|
* it to become ready to service requests. Likewise, the server may
|
|
* be very (temporarily) busy and not respond to our connections.
|
|
*
|
|
* We should gracefully and silently handle those conditions and try
|
|
* again for a reasonable time period.
|
|
*
|
|
* The value chosen here should be long enough for the server
|
|
* to reliably heal from the above conditions.
|
|
*/
|
|
#define MY_CONNECTION_TIMEOUT_MS (1000)
|
|
|
|
enum ipc_active_state ipc_client_try_connect(
|
|
const char *path,
|
|
const struct ipc_client_connect_options *options,
|
|
struct ipc_client_connection **p_connection)
|
|
{
|
|
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
|
|
int fd = -1;
|
|
|
|
*p_connection = NULL;
|
|
|
|
trace2_region_enter("ipc-client", "try-connect", NULL);
|
|
trace2_data_string("ipc-client", NULL, "try-connect/path", path);
|
|
|
|
state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
|
|
options, &fd);
|
|
|
|
trace2_data_intmax("ipc-client", NULL, "try-connect/state",
|
|
(intmax_t)state);
|
|
trace2_region_leave("ipc-client", "try-connect", NULL);
|
|
|
|
if (state == IPC_STATE__LISTENING) {
|
|
(*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
|
|
(*p_connection)->fd = fd;
|
|
}
|
|
|
|
return state;
|
|
}
|
|
|
|
void ipc_client_close_connection(struct ipc_client_connection *connection)
|
|
{
|
|
if (!connection)
|
|
return;
|
|
|
|
if (connection->fd != -1)
|
|
close(connection->fd);
|
|
|
|
free(connection);
|
|
}
|
|
|
|
int ipc_client_send_command_to_connection(
|
|
struct ipc_client_connection *connection,
|
|
const char *message, struct strbuf *answer)
|
|
{
|
|
int ret = 0;
|
|
|
|
strbuf_setlen(answer, 0);
|
|
|
|
trace2_region_enter("ipc-client", "send-command", NULL);
|
|
|
|
if (write_packetized_from_buf_no_flush(message, strlen(message),
|
|
connection->fd) < 0 ||
|
|
packet_flush_gently(connection->fd) < 0) {
|
|
ret = error(_("could not send IPC command"));
|
|
goto done;
|
|
}
|
|
|
|
if (read_packetized_to_strbuf(
|
|
connection->fd, answer,
|
|
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
|
|
ret = error(_("could not read IPC response"));
|
|
goto done;
|
|
}
|
|
|
|
done:
|
|
trace2_region_leave("ipc-client", "send-command", NULL);
|
|
return ret;
|
|
}
|
|
|
|
int ipc_client_send_command(const char *path,
|
|
const struct ipc_client_connect_options *options,
|
|
const char *message, struct strbuf *answer)
|
|
{
|
|
int ret = -1;
|
|
enum ipc_active_state state;
|
|
struct ipc_client_connection *connection = NULL;
|
|
|
|
state = ipc_client_try_connect(path, options, &connection);
|
|
|
|
if (state != IPC_STATE__LISTENING)
|
|
return ret;
|
|
|
|
ret = ipc_client_send_command_to_connection(connection, message, answer);
|
|
|
|
ipc_client_close_connection(connection);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int set_socket_blocking_flag(int fd, int make_nonblocking)
|
|
{
|
|
int flags;
|
|
|
|
flags = fcntl(fd, F_GETFL, NULL);
|
|
|
|
if (flags < 0)
|
|
return -1;
|
|
|
|
if (make_nonblocking)
|
|
flags |= O_NONBLOCK;
|
|
else
|
|
flags &= ~O_NONBLOCK;
|
|
|
|
return fcntl(fd, F_SETFL, flags);
|
|
}
|
|
|
|
/*
|
|
* Magic numbers used to annotate callback instance data.
|
|
* These are used to help guard against accidentally passing the
|
|
* wrong instance data across multiple levels of callbacks (which
|
|
* is easy to do if there are `void*` arguments).
|
|
*/
|
|
enum magic {
|
|
MAGIC_SERVER_REPLY_DATA,
|
|
MAGIC_WORKER_THREAD_DATA,
|
|
MAGIC_ACCEPT_THREAD_DATA,
|
|
MAGIC_SERVER_DATA,
|
|
};
|
|
|
|
struct ipc_server_reply_data {
|
|
enum magic magic;
|
|
int fd;
|
|
struct ipc_worker_thread_data *worker_thread_data;
|
|
};
|
|
|
|
struct ipc_worker_thread_data {
|
|
enum magic magic;
|
|
struct ipc_worker_thread_data *next_thread;
|
|
struct ipc_server_data *server_data;
|
|
pthread_t pthread_id;
|
|
};
|
|
|
|
struct ipc_accept_thread_data {
|
|
enum magic magic;
|
|
struct ipc_server_data *server_data;
|
|
|
|
struct unix_ss_socket *server_socket;
|
|
|
|
int fd_send_shutdown;
|
|
int fd_wait_shutdown;
|
|
pthread_t pthread_id;
|
|
};
|
|
|
|
/*
|
|
* With unix-sockets, the conceptual "ipc-server" is implemented as a single
|
|
* controller "accept-thread" thread and a pool of "worker-thread" threads.
|
|
* The former does the usual `accept()` loop and dispatches connections
|
|
* to an idle worker thread. The worker threads wait in an idle loop for
|
|
* a new connection, communicate with the client and relay data to/from
|
|
* the `application_cb` and then wait for another connection from the
|
|
* server thread. This avoids the overhead of constantly creating and
|
|
* destroying threads.
|
|
*/
|
|
struct ipc_server_data {
|
|
enum magic magic;
|
|
ipc_server_application_cb *application_cb;
|
|
void *application_data;
|
|
struct strbuf buf_path;
|
|
|
|
struct ipc_accept_thread_data *accept_thread;
|
|
struct ipc_worker_thread_data *worker_thread_list;
|
|
|
|
pthread_mutex_t work_available_mutex;
|
|
pthread_cond_t work_available_cond;
|
|
|
|
/*
|
|
* Accepted but not yet processed client connections are kept
|
|
* in a circular buffer FIFO. The queue is empty when the
|
|
* positions are equal.
|
|
*/
|
|
int *fifo_fds;
|
|
int queue_size;
|
|
int back_pos;
|
|
int front_pos;
|
|
|
|
int shutdown_requested;
|
|
int is_stopped;
|
|
};
|
|
|
|
/*
|
|
* Remove and return the oldest queued connection.
|
|
*
|
|
* Returns -1 if empty.
|
|
*/
|
|
static int fifo_dequeue(struct ipc_server_data *server_data)
|
|
{
|
|
/* ASSERT holding mutex */
|
|
|
|
int fd;
|
|
|
|
if (server_data->back_pos == server_data->front_pos)
|
|
return -1;
|
|
|
|
fd = server_data->fifo_fds[server_data->front_pos];
|
|
server_data->fifo_fds[server_data->front_pos] = -1;
|
|
|
|
server_data->front_pos++;
|
|
if (server_data->front_pos == server_data->queue_size)
|
|
server_data->front_pos = 0;
|
|
|
|
return fd;
|
|
}
|
|
|
|
/*
|
|
* Push a new fd onto the back of the queue.
|
|
*
|
|
* Drop it and return -1 if queue is already full.
|
|
*/
|
|
static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
|
|
{
|
|
/* ASSERT holding mutex */
|
|
|
|
int next_back_pos;
|
|
|
|
next_back_pos = server_data->back_pos + 1;
|
|
if (next_back_pos == server_data->queue_size)
|
|
next_back_pos = 0;
|
|
|
|
if (next_back_pos == server_data->front_pos) {
|
|
/* Queue is full. Just drop it. */
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
server_data->fifo_fds[server_data->back_pos] = fd;
|
|
server_data->back_pos = next_back_pos;
|
|
|
|
return fd;
|
|
}
|
|
|
|
/*
|
|
* Wait for a connection to be queued to the FIFO and return it.
|
|
*
|
|
* Returns -1 if someone has already requested a shutdown.
|
|
*/
|
|
static int worker_thread__wait_for_connection(
|
|
struct ipc_worker_thread_data *worker_thread_data)
|
|
{
|
|
/* ASSERT NOT holding mutex */
|
|
|
|
struct ipc_server_data *server_data = worker_thread_data->server_data;
|
|
int fd = -1;
|
|
|
|
pthread_mutex_lock(&server_data->work_available_mutex);
|
|
for (;;) {
|
|
if (server_data->shutdown_requested)
|
|
break;
|
|
|
|
fd = fifo_dequeue(server_data);
|
|
if (fd >= 0)
|
|
break;
|
|
|
|
pthread_cond_wait(&server_data->work_available_cond,
|
|
&server_data->work_available_mutex);
|
|
}
|
|
pthread_mutex_unlock(&server_data->work_available_mutex);
|
|
|
|
return fd;
|
|
}
|
|
|
|
/*
|
|
* Forward declare our reply callback function so that any compiler
|
|
* errors are reported when we actually define the function (in addition
|
|
* to any errors reported when we try to pass this callback function as
|
|
* a parameter in a function call). The former are easier to understand.
|
|
*/
|
|
static ipc_server_reply_cb do_io_reply_callback;
|
|
|
|
/*
|
|
* Relay application's response message to the client process.
|
|
* (We do not flush at this point because we allow the caller
|
|
* to chunk data to the client thru us.)
|
|
*/
|
|
static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
|
|
const char *response, size_t response_len)
|
|
{
|
|
if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
|
|
BUG("reply_cb called with wrong instance data");
|
|
|
|
return write_packetized_from_buf_no_flush(response, response_len,
|
|
reply_data->fd);
|
|
}
|
|
|
|
/* A randomly chosen value. */
|
|
#define MY_WAIT_POLL_TIMEOUT_MS (10)
|
|
|
|
/*
|
|
* If the client hangs up without sending any data on the wire, just
|
|
* quietly close the socket and ignore this client.
|
|
*
|
|
* This worker thread is committed to reading the IPC request data
|
|
* from the client at the other end of this fd. Wait here for the
|
|
* client to actually put something on the wire -- because if the
|
|
* client just does a ping (connect and hangup without sending any
|
|
* data), our use of the pkt-line read routines will spew an error
|
|
* message.
|
|
*
|
|
* Return -1 if the client hung up.
|
|
* Return 0 if data (possibly incomplete) is ready.
|
|
*/
|
|
static int worker_thread__wait_for_io_start(
|
|
struct ipc_worker_thread_data *worker_thread_data,
|
|
int fd)
|
|
{
|
|
struct ipc_server_data *server_data = worker_thread_data->server_data;
|
|
struct pollfd pollfd[1];
|
|
int result;
|
|
|
|
for (;;) {
|
|
pollfd[0].fd = fd;
|
|
pollfd[0].events = POLLIN;
|
|
|
|
result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
|
|
if (result < 0) {
|
|
if (errno == EINTR)
|
|
continue;
|
|
goto cleanup;
|
|
}
|
|
|
|
if (result == 0) {
|
|
/* a timeout */
|
|
|
|
int in_shutdown;
|
|
|
|
pthread_mutex_lock(&server_data->work_available_mutex);
|
|
in_shutdown = server_data->shutdown_requested;
|
|
pthread_mutex_unlock(&server_data->work_available_mutex);
|
|
|
|
/*
|
|
* If a shutdown is already in progress and this
|
|
* client has not started talking yet, just drop it.
|
|
*/
|
|
if (in_shutdown)
|
|
goto cleanup;
|
|
continue;
|
|
}
|
|
|
|
if (pollfd[0].revents & POLLHUP)
|
|
goto cleanup;
|
|
|
|
if (pollfd[0].revents & POLLIN)
|
|
return 0;
|
|
|
|
goto cleanup;
|
|
}
|
|
|
|
cleanup:
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
/*
|
|
* Receive the request/command from the client and pass it to the
|
|
* registered request-callback. The request-callback will compose
|
|
* a response and call our reply-callback to send it to the client.
|
|
*/
|
|
static int worker_thread__do_io(
|
|
struct ipc_worker_thread_data *worker_thread_data,
|
|
int fd)
|
|
{
|
|
/* ASSERT NOT holding lock */
|
|
|
|
struct strbuf buf = STRBUF_INIT;
|
|
struct ipc_server_reply_data reply_data;
|
|
int ret = 0;
|
|
|
|
reply_data.magic = MAGIC_SERVER_REPLY_DATA;
|
|
reply_data.worker_thread_data = worker_thread_data;
|
|
|
|
reply_data.fd = fd;
|
|
|
|
ret = read_packetized_to_strbuf(
|
|
reply_data.fd, &buf,
|
|
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
|
|
if (ret >= 0) {
|
|
ret = worker_thread_data->server_data->application_cb(
|
|
worker_thread_data->server_data->application_data,
|
|
buf.buf, do_io_reply_callback, &reply_data);
|
|
|
|
packet_flush_gently(reply_data.fd);
|
|
}
|
|
else {
|
|
/*
|
|
* The client probably disconnected/shutdown before it
|
|
* could send a well-formed message. Ignore it.
|
|
*/
|
|
}
|
|
|
|
strbuf_release(&buf);
|
|
close(reply_data.fd);
|
|
|
|
return ret;
|
|
}
|
|
|
|
/*
|
|
* Block SIGPIPE on the current thread (so that we get EPIPE from
|
|
* write() rather than an actual signal).
|
|
*
|
|
* Note that using sigchain_push() and _pop() to control SIGPIPE
|
|
* around our IO calls is not thread safe:
|
|
* [] It uses a global stack of handler frames.
|
|
* [] It uses ALLOC_GROW() to resize it.
|
|
* [] Finally, according to the `signal(2)` man-page:
|
|
* "The effects of `signal()` in a multithreaded process are unspecified."
|
|
*/
|
|
static void thread_block_sigpipe(sigset_t *old_set)
|
|
{
|
|
sigset_t new_set;
|
|
|
|
sigemptyset(&new_set);
|
|
sigaddset(&new_set, SIGPIPE);
|
|
|
|
sigemptyset(old_set);
|
|
pthread_sigmask(SIG_BLOCK, &new_set, old_set);
|
|
}
|
|
|
|
/*
|
|
* Thread proc for an IPC worker thread. It handles a series of
|
|
* connections from clients. It pulls the next fd from the queue
|
|
* processes it, and then waits for the next client.
|
|
*
|
|
* Block SIGPIPE in this worker thread for the life of the thread.
|
|
* This avoids stray (and sometimes delayed) SIGPIPE signals caused
|
|
* by client errors and/or when we are under extremely heavy IO load.
|
|
*
|
|
* This means that the application callback will have SIGPIPE blocked.
|
|
* The callback should not change it.
|
|
*/
|
|
static void *worker_thread_proc(void *_worker_thread_data)
|
|
{
|
|
struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
|
|
struct ipc_server_data *server_data = worker_thread_data->server_data;
|
|
sigset_t old_set;
|
|
int fd, io;
|
|
int ret;
|
|
|
|
trace2_thread_start("ipc-worker");
|
|
|
|
thread_block_sigpipe(&old_set);
|
|
|
|
for (;;) {
|
|
fd = worker_thread__wait_for_connection(worker_thread_data);
|
|
if (fd == -1)
|
|
break; /* in shutdown */
|
|
|
|
io = worker_thread__wait_for_io_start(worker_thread_data, fd);
|
|
if (io == -1)
|
|
continue; /* client hung up without sending anything */
|
|
|
|
ret = worker_thread__do_io(worker_thread_data, fd);
|
|
|
|
if (ret == SIMPLE_IPC_QUIT) {
|
|
trace2_data_string("ipc-worker", NULL, "queue_stop_async",
|
|
"application_quit");
|
|
/*
|
|
* The application layer is telling the ipc-server
|
|
* layer to shutdown.
|
|
*
|
|
* We DO NOT have a response to send to the client.
|
|
*
|
|
* Queue an async stop (to stop the other threads) and
|
|
* allow this worker thread to exit now (no sense waiting
|
|
* for the thread-pool shutdown signal).
|
|
*
|
|
* Other non-idle worker threads are allowed to finish
|
|
* responding to their current clients.
|
|
*/
|
|
ipc_server_stop_async(server_data);
|
|
break;
|
|
}
|
|
}
|
|
|
|
trace2_thread_exit();
|
|
return NULL;
|
|
}
|
|
|
|
/* A randomly chosen value. */
|
|
#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
|
|
|
|
/*
|
|
* Accept a new client connection on our socket. This uses non-blocking
|
|
* IO so that we can also wait for shutdown requests on our socket-pair
|
|
* without actually spinning on a fast timeout.
|
|
*/
|
|
static int accept_thread__wait_for_connection(
|
|
struct ipc_accept_thread_data *accept_thread_data)
|
|
{
|
|
struct pollfd pollfd[2];
|
|
int result;
|
|
|
|
for (;;) {
|
|
pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
|
|
pollfd[0].events = POLLIN;
|
|
|
|
pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
|
|
pollfd[1].events = POLLIN;
|
|
|
|
result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
|
|
if (result < 0) {
|
|
if (errno == EINTR)
|
|
continue;
|
|
return result;
|
|
}
|
|
|
|
if (result == 0) {
|
|
/* a timeout */
|
|
|
|
/*
|
|
* If someone deletes or force-creates a new unix
|
|
* domain socket at our path, all future clients
|
|
* will be routed elsewhere and we silently starve.
|
|
* If that happens, just queue a shutdown.
|
|
*/
|
|
if (unix_ss_was_stolen(
|
|
accept_thread_data->server_socket)) {
|
|
trace2_data_string("ipc-accept", NULL,
|
|
"queue_stop_async",
|
|
"socket_stolen");
|
|
ipc_server_stop_async(
|
|
accept_thread_data->server_data);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (pollfd[0].revents & POLLIN) {
|
|
/* shutdown message queued to socketpair */
|
|
return -1;
|
|
}
|
|
|
|
if (pollfd[1].revents & POLLIN) {
|
|
/* a connection is available on server_socket */
|
|
|
|
int client_fd =
|
|
accept(accept_thread_data->server_socket->fd_socket,
|
|
NULL, NULL);
|
|
if (client_fd >= 0)
|
|
return client_fd;
|
|
|
|
/*
|
|
* An error here is unlikely -- it probably
|
|
* indicates that the connecting process has
|
|
* already dropped the connection.
|
|
*/
|
|
continue;
|
|
}
|
|
|
|
BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
|
|
errno, pollfd[0].revents, pollfd[1].revents);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Thread proc for the IPC server "accept thread". This waits for
|
|
* an incoming socket connection, appends it to the queue of available
|
|
* connections, and notifies a worker thread to process it.
|
|
*
|
|
* Block SIGPIPE in this thread for the life of the thread. This
|
|
* avoids any stray SIGPIPE signals when closing pipe fds under
|
|
* extremely heavy loads (such as when the fifo queue is full and we
|
|
* drop incomming connections).
|
|
*/
|
|
static void *accept_thread_proc(void *_accept_thread_data)
|
|
{
|
|
struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
|
|
struct ipc_server_data *server_data = accept_thread_data->server_data;
|
|
sigset_t old_set;
|
|
|
|
trace2_thread_start("ipc-accept");
|
|
|
|
thread_block_sigpipe(&old_set);
|
|
|
|
for (;;) {
|
|
int client_fd = accept_thread__wait_for_connection(
|
|
accept_thread_data);
|
|
|
|
pthread_mutex_lock(&server_data->work_available_mutex);
|
|
if (server_data->shutdown_requested) {
|
|
pthread_mutex_unlock(&server_data->work_available_mutex);
|
|
if (client_fd >= 0)
|
|
close(client_fd);
|
|
break;
|
|
}
|
|
|
|
if (client_fd < 0) {
|
|
/* ignore transient accept() errors */
|
|
}
|
|
else {
|
|
fifo_enqueue(server_data, client_fd);
|
|
pthread_cond_broadcast(&server_data->work_available_cond);
|
|
}
|
|
pthread_mutex_unlock(&server_data->work_available_mutex);
|
|
}
|
|
|
|
trace2_thread_exit();
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* We can't predict the connection arrival rate relative to the worker
|
|
* processing rate, therefore we allow the "accept-thread" to queue up
|
|
* a generous number of connections, since we'd rather have the client
|
|
* not unnecessarily timeout if we can avoid it. (The assumption is
|
|
* that this will be used for FSMonitor and a few second wait on a
|
|
* connection is better than having the client timeout and do the full
|
|
* computation itself.)
|
|
*
|
|
* The FIFO queue size is set to a multiple of the worker pool size.
|
|
* This value chosen at random.
|
|
*/
|
|
#define FIFO_SCALE (100)
|
|
|
|
/*
|
|
* The backlog value for `listen(2)`. This doesn't need to huge,
|
|
* rather just large enough for our "accept-thread" to wake up and
|
|
* queue incoming connections onto the FIFO without the kernel
|
|
* dropping any.
|
|
*
|
|
* This value chosen at random.
|
|
*/
|
|
#define LISTEN_BACKLOG (50)
|
|
|
|
static int create_listener_socket(
|
|
const char *path,
|
|
const struct ipc_server_opts *ipc_opts,
|
|
struct unix_ss_socket **new_server_socket)
|
|
{
|
|
struct unix_ss_socket *server_socket = NULL;
|
|
struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
|
|
int ret;
|
|
|
|
uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
|
|
uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
|
|
|
|
ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
|
|
if (ret)
|
|
return ret;
|
|
|
|
if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
|
|
int saved_errno = errno;
|
|
unix_ss_free(server_socket);
|
|
errno = saved_errno;
|
|
return -1;
|
|
}
|
|
|
|
*new_server_socket = server_socket;
|
|
|
|
trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
|
|
return 0;
|
|
}
|
|
|
|
static int setup_listener_socket(
|
|
const char *path,
|
|
const struct ipc_server_opts *ipc_opts,
|
|
struct unix_ss_socket **new_server_socket)
|
|
{
|
|
int ret, saved_errno;
|
|
|
|
trace2_region_enter("ipc-server", "create-listener_socket", NULL);
|
|
|
|
ret = create_listener_socket(path, ipc_opts, new_server_socket);
|
|
|
|
saved_errno = errno;
|
|
trace2_region_leave("ipc-server", "create-listener_socket", NULL);
|
|
errno = saved_errno;
|
|
|
|
return ret;
|
|
}
|
|
|
|
/*
|
|
* Start IPC server in a pool of background threads.
|
|
*/
|
|
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
|
|
const char *path, const struct ipc_server_opts *opts,
|
|
ipc_server_application_cb *application_cb,
|
|
void *application_data)
|
|
{
|
|
struct unix_ss_socket *server_socket = NULL;
|
|
struct ipc_server_data *server_data;
|
|
int sv[2];
|
|
int k;
|
|
int ret;
|
|
int nr_threads = opts->nr_threads;
|
|
|
|
*returned_server_data = NULL;
|
|
|
|
/*
|
|
* Create a socketpair and set sv[1] to non-blocking. This
|
|
* will used to send a shutdown message to the accept-thread
|
|
* and allows the accept-thread to wait on EITHER a client
|
|
* connection or a shutdown request without spinning.
|
|
*/
|
|
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
|
|
return -1;
|
|
|
|
if (set_socket_blocking_flag(sv[1], 1)) {
|
|
int saved_errno = errno;
|
|
close(sv[0]);
|
|
close(sv[1]);
|
|
errno = saved_errno;
|
|
return -1;
|
|
}
|
|
|
|
ret = setup_listener_socket(path, opts, &server_socket);
|
|
if (ret) {
|
|
int saved_errno = errno;
|
|
close(sv[0]);
|
|
close(sv[1]);
|
|
errno = saved_errno;
|
|
return ret;
|
|
}
|
|
|
|
server_data = xcalloc(1, sizeof(*server_data));
|
|
server_data->magic = MAGIC_SERVER_DATA;
|
|
server_data->application_cb = application_cb;
|
|
server_data->application_data = application_data;
|
|
strbuf_init(&server_data->buf_path, 0);
|
|
strbuf_addstr(&server_data->buf_path, path);
|
|
|
|
if (nr_threads < 1)
|
|
nr_threads = 1;
|
|
|
|
pthread_mutex_init(&server_data->work_available_mutex, NULL);
|
|
pthread_cond_init(&server_data->work_available_cond, NULL);
|
|
|
|
server_data->queue_size = nr_threads * FIFO_SCALE;
|
|
CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
|
|
|
|
server_data->accept_thread =
|
|
xcalloc(1, sizeof(*server_data->accept_thread));
|
|
server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
|
|
server_data->accept_thread->server_data = server_data;
|
|
server_data->accept_thread->server_socket = server_socket;
|
|
server_data->accept_thread->fd_send_shutdown = sv[0];
|
|
server_data->accept_thread->fd_wait_shutdown = sv[1];
|
|
|
|
if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
|
|
accept_thread_proc, server_data->accept_thread))
|
|
die_errno(_("could not start accept_thread '%s'"), path);
|
|
|
|
for (k = 0; k < nr_threads; k++) {
|
|
struct ipc_worker_thread_data *wtd;
|
|
|
|
wtd = xcalloc(1, sizeof(*wtd));
|
|
wtd->magic = MAGIC_WORKER_THREAD_DATA;
|
|
wtd->server_data = server_data;
|
|
|
|
if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
|
|
wtd)) {
|
|
if (k == 0)
|
|
die(_("could not start worker[0] for '%s'"),
|
|
path);
|
|
/*
|
|
* Limp along with the thread pool that we have.
|
|
*/
|
|
break;
|
|
}
|
|
|
|
wtd->next_thread = server_data->worker_thread_list;
|
|
server_data->worker_thread_list = wtd;
|
|
}
|
|
|
|
*returned_server_data = server_data;
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Gently tell the IPC server treads to shutdown.
|
|
* Can be run on any thread.
|
|
*/
|
|
int ipc_server_stop_async(struct ipc_server_data *server_data)
|
|
{
|
|
/* ASSERT NOT holding mutex */
|
|
|
|
int fd;
|
|
|
|
if (!server_data)
|
|
return 0;
|
|
|
|
trace2_region_enter("ipc-server", "server-stop-async", NULL);
|
|
|
|
pthread_mutex_lock(&server_data->work_available_mutex);
|
|
|
|
server_data->shutdown_requested = 1;
|
|
|
|
/*
|
|
* Write a byte to the shutdown socket pair to wake up the
|
|
* accept-thread.
|
|
*/
|
|
if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
|
|
error_errno("could not write to fd_send_shutdown");
|
|
|
|
/*
|
|
* Drain the queue of existing connections.
|
|
*/
|
|
while ((fd = fifo_dequeue(server_data)) != -1)
|
|
close(fd);
|
|
|
|
/*
|
|
* Gently tell worker threads to stop processing new connections
|
|
* and exit. (This does not abort in-process conversations.)
|
|
*/
|
|
pthread_cond_broadcast(&server_data->work_available_cond);
|
|
|
|
pthread_mutex_unlock(&server_data->work_available_mutex);
|
|
|
|
trace2_region_leave("ipc-server", "server-stop-async", NULL);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Wait for all IPC server threads to stop.
|
|
*/
|
|
int ipc_server_await(struct ipc_server_data *server_data)
|
|
{
|
|
pthread_join(server_data->accept_thread->pthread_id, NULL);
|
|
|
|
if (!server_data->shutdown_requested)
|
|
BUG("ipc-server: accept-thread stopped for '%s'",
|
|
server_data->buf_path.buf);
|
|
|
|
while (server_data->worker_thread_list) {
|
|
struct ipc_worker_thread_data *wtd =
|
|
server_data->worker_thread_list;
|
|
|
|
pthread_join(wtd->pthread_id, NULL);
|
|
|
|
server_data->worker_thread_list = wtd->next_thread;
|
|
free(wtd);
|
|
}
|
|
|
|
server_data->is_stopped = 1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
void ipc_server_free(struct ipc_server_data *server_data)
|
|
{
|
|
struct ipc_accept_thread_data * accept_thread_data;
|
|
|
|
if (!server_data)
|
|
return;
|
|
|
|
if (!server_data->is_stopped)
|
|
BUG("cannot free ipc-server while running for '%s'",
|
|
server_data->buf_path.buf);
|
|
|
|
accept_thread_data = server_data->accept_thread;
|
|
if (accept_thread_data) {
|
|
unix_ss_free(accept_thread_data->server_socket);
|
|
|
|
if (accept_thread_data->fd_send_shutdown != -1)
|
|
close(accept_thread_data->fd_send_shutdown);
|
|
if (accept_thread_data->fd_wait_shutdown != -1)
|
|
close(accept_thread_data->fd_wait_shutdown);
|
|
|
|
free(server_data->accept_thread);
|
|
}
|
|
|
|
while (server_data->worker_thread_list) {
|
|
struct ipc_worker_thread_data *wtd =
|
|
server_data->worker_thread_list;
|
|
|
|
server_data->worker_thread_list = wtd->next_thread;
|
|
free(wtd);
|
|
}
|
|
|
|
pthread_cond_destroy(&server_data->work_available_cond);
|
|
pthread_mutex_destroy(&server_data->work_available_mutex);
|
|
|
|
strbuf_release(&server_data->buf_path);
|
|
|
|
free(server_data->fifo_fds);
|
|
free(server_data);
|
|
}
|