#include "cache.h"
#include "simple-ipc.h"
#include "strbuf.h"
#include "pkt-line.h"
#include "thread-utils.h"
#include "unix-socket.h"
#include "unix-stream-server.h"

#ifndef SUPPORTS_SIMPLE_IPC
/*
 * This source file should only be compiled when Simple IPC is supported.
 * See the top-level Makefile.
 */
#error SUPPORTS_SIMPLE_IPC not defined
#endif

enum ipc_active_state ipc_get_active_state(const char *path)
{
	enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
	struct ipc_client_connect_options options
		= IPC_CLIENT_CONNECT_OPTIONS_INIT;
	struct stat st;
	struct ipc_client_connection *connection_test = NULL;

	options.wait_if_busy = 0;
	options.wait_if_not_found = 0;

	if (lstat(path, &st) == -1) {
		switch (errno) {
		case ENOENT:
		case ENOTDIR:
			return IPC_STATE__NOT_LISTENING;
		default:
			return IPC_STATE__INVALID_PATH;
		}
	}

	/* also complain if a plain file is in the way */
	if ((st.st_mode & S_IFMT) != S_IFSOCK)
		return IPC_STATE__INVALID_PATH;

	/*
	 * Just because the filesystem has a S_IFSOCK type inode
	 * at `path`, doesn't mean it that there is a server listening.
	 * Ping it to be sure.
	 */
	state = ipc_client_try_connect(path, &options, &connection_test);
	ipc_client_close_connection(connection_test);

	return state;
}

/*
 * Retry frequency when trying to connect to a server.
 *
 * This value should be short enough that we don't seriously delay our
 * caller, but not fast enough that our spinning puts pressure on the
 * system.
 */
#define WAIT_STEP_MS (50)

/*
 * Try to connect to the server.  If the server is just starting up or
 * is very busy, we may not get a connection the first time.
 */
static enum ipc_active_state connect_to_server(
	const char *path,
	int timeout_ms,
	const struct ipc_client_connect_options *options,
	int *pfd)
{
	int k;

	*pfd = -1;

	for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
		int fd = unix_stream_connect(path, options->uds_disallow_chdir);

		if (fd != -1) {
			*pfd = fd;
			return IPC_STATE__LISTENING;
		}

		if (errno == ENOENT) {
			if (!options->wait_if_not_found)
				return IPC_STATE__PATH_NOT_FOUND;

			goto sleep_and_try_again;
		}

		if (errno == ETIMEDOUT) {
			if (!options->wait_if_busy)
				return IPC_STATE__NOT_LISTENING;

			goto sleep_and_try_again;
		}

		if (errno == ECONNREFUSED) {
			if (!options->wait_if_busy)
				return IPC_STATE__NOT_LISTENING;

			goto sleep_and_try_again;
		}

		return IPC_STATE__OTHER_ERROR;

	sleep_and_try_again:
		sleep_millisec(WAIT_STEP_MS);
	}

	return IPC_STATE__NOT_LISTENING;
}

/*
 * The total amount of time that we are willing to wait when trying to
 * connect to a server.
 *
 * When the server is first started, it might take a little while for
 * it to become ready to service requests.  Likewise, the server may
 * be very (temporarily) busy and not respond to our connections.
 *
 * We should gracefully and silently handle those conditions and try
 * again for a reasonable time period.
 *
 * The value chosen here should be long enough for the server
 * to reliably heal from the above conditions.
 */
#define MY_CONNECTION_TIMEOUT_MS (1000)

enum ipc_active_state ipc_client_try_connect(
	const char *path,
	const struct ipc_client_connect_options *options,
	struct ipc_client_connection **p_connection)
{
	enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
	int fd = -1;

	*p_connection = NULL;

	trace2_region_enter("ipc-client", "try-connect", NULL);
	trace2_data_string("ipc-client", NULL, "try-connect/path", path);

	state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
				  options, &fd);

	trace2_data_intmax("ipc-client", NULL, "try-connect/state",
			   (intmax_t)state);
	trace2_region_leave("ipc-client", "try-connect", NULL);

	if (state == IPC_STATE__LISTENING) {
		(*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
		(*p_connection)->fd = fd;
	}

	return state;
}

void ipc_client_close_connection(struct ipc_client_connection *connection)
{
	if (!connection)
		return;

	if (connection->fd != -1)
		close(connection->fd);

	free(connection);
}

int ipc_client_send_command_to_connection(
	struct ipc_client_connection *connection,
	const char *message, struct strbuf *answer)
{
	int ret = 0;

	strbuf_setlen(answer, 0);

	trace2_region_enter("ipc-client", "send-command", NULL);

	if (write_packetized_from_buf_no_flush(message, strlen(message),
					       connection->fd) < 0 ||
	    packet_flush_gently(connection->fd) < 0) {
		ret = error(_("could not send IPC command"));
		goto done;
	}

	if (read_packetized_to_strbuf(
		    connection->fd, answer,
		    PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
		ret = error(_("could not read IPC response"));
		goto done;
	}

done:
	trace2_region_leave("ipc-client", "send-command", NULL);
	return ret;
}

int ipc_client_send_command(const char *path,
			    const struct ipc_client_connect_options *options,
			    const char *message, struct strbuf *answer)
{
	int ret = -1;
	enum ipc_active_state state;
	struct ipc_client_connection *connection = NULL;

	state = ipc_client_try_connect(path, options, &connection);

	if (state != IPC_STATE__LISTENING)
		return ret;

	ret = ipc_client_send_command_to_connection(connection, message, answer);

	ipc_client_close_connection(connection);

	return ret;
}

static int set_socket_blocking_flag(int fd, int make_nonblocking)
{
	int flags;

	flags = fcntl(fd, F_GETFL, NULL);

	if (flags < 0)
		return -1;

	if (make_nonblocking)
		flags |= O_NONBLOCK;
	else
		flags &= ~O_NONBLOCK;

	return fcntl(fd, F_SETFL, flags);
}

/*
 * Magic numbers used to annotate callback instance data.
 * These are used to help guard against accidentally passing the
 * wrong instance data across multiple levels of callbacks (which
 * is easy to do if there are `void*` arguments).
 */
enum magic {
	MAGIC_SERVER_REPLY_DATA,
	MAGIC_WORKER_THREAD_DATA,
	MAGIC_ACCEPT_THREAD_DATA,
	MAGIC_SERVER_DATA,
};

struct ipc_server_reply_data {
	enum magic magic;
	int fd;
	struct ipc_worker_thread_data *worker_thread_data;
};

struct ipc_worker_thread_data {
	enum magic magic;
	struct ipc_worker_thread_data *next_thread;
	struct ipc_server_data *server_data;
	pthread_t pthread_id;
};

struct ipc_accept_thread_data {
	enum magic magic;
	struct ipc_server_data *server_data;

	struct unix_ss_socket *server_socket;

	int fd_send_shutdown;
	int fd_wait_shutdown;
	pthread_t pthread_id;
};

/*
 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
 * controller "accept-thread" thread and a pool of "worker-thread" threads.
 * The former does the usual `accept()` loop and dispatches connections
 * to an idle worker thread.  The worker threads wait in an idle loop for
 * a new connection, communicate with the client and relay data to/from
 * the `application_cb` and then wait for another connection from the
 * server thread.  This avoids the overhead of constantly creating and
 * destroying threads.
 */
struct ipc_server_data {
	enum magic magic;
	ipc_server_application_cb *application_cb;
	void *application_data;
	struct strbuf buf_path;

	struct ipc_accept_thread_data *accept_thread;
	struct ipc_worker_thread_data *worker_thread_list;

	pthread_mutex_t work_available_mutex;
	pthread_cond_t work_available_cond;

	/*
	 * Accepted but not yet processed client connections are kept
	 * in a circular buffer FIFO.  The queue is empty when the
	 * positions are equal.
	 */
	int *fifo_fds;
	int queue_size;
	int back_pos;
	int front_pos;

	int shutdown_requested;
	int is_stopped;
};

/*
 * Remove and return the oldest queued connection.
 *
 * Returns -1 if empty.
 */
static int fifo_dequeue(struct ipc_server_data *server_data)
{
	/* ASSERT holding mutex */

	int fd;

	if (server_data->back_pos == server_data->front_pos)
		return -1;

	fd = server_data->fifo_fds[server_data->front_pos];
	server_data->fifo_fds[server_data->front_pos] = -1;

	server_data->front_pos++;
	if (server_data->front_pos == server_data->queue_size)
		server_data->front_pos = 0;

	return fd;
}

/*
 * Push a new fd onto the back of the queue.
 *
 * Drop it and return -1 if queue is already full.
 */
static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
{
	/* ASSERT holding mutex */

	int next_back_pos;

	next_back_pos = server_data->back_pos + 1;
	if (next_back_pos == server_data->queue_size)
		next_back_pos = 0;

	if (next_back_pos == server_data->front_pos) {
		/* Queue is full. Just drop it. */
		close(fd);
		return -1;
	}

	server_data->fifo_fds[server_data->back_pos] = fd;
	server_data->back_pos = next_back_pos;

	return fd;
}

/*
 * Wait for a connection to be queued to the FIFO and return it.
 *
 * Returns -1 if someone has already requested a shutdown.
 */
static int worker_thread__wait_for_connection(
	struct ipc_worker_thread_data *worker_thread_data)
{
	/* ASSERT NOT holding mutex */

	struct ipc_server_data *server_data = worker_thread_data->server_data;
	int fd = -1;

	pthread_mutex_lock(&server_data->work_available_mutex);
	for (;;) {
		if (server_data->shutdown_requested)
			break;

		fd = fifo_dequeue(server_data);
		if (fd >= 0)
			break;

		pthread_cond_wait(&server_data->work_available_cond,
				  &server_data->work_available_mutex);
	}
	pthread_mutex_unlock(&server_data->work_available_mutex);

	return fd;
}

/*
 * Forward declare our reply callback function so that any compiler
 * errors are reported when we actually define the function (in addition
 * to any errors reported when we try to pass this callback function as
 * a parameter in a function call).  The former are easier to understand.
 */
static ipc_server_reply_cb do_io_reply_callback;

/*
 * Relay application's response message to the client process.
 * (We do not flush at this point because we allow the caller
 * to chunk data to the client thru us.)
 */
static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
		       const char *response, size_t response_len)
{
	if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
		BUG("reply_cb called with wrong instance data");

	return write_packetized_from_buf_no_flush(response, response_len,
						  reply_data->fd);
}

/* A randomly chosen value. */
#define MY_WAIT_POLL_TIMEOUT_MS (10)

/*
 * If the client hangs up without sending any data on the wire, just
 * quietly close the socket and ignore this client.
 *
 * This worker thread is committed to reading the IPC request data
 * from the client at the other end of this fd.  Wait here for the
 * client to actually put something on the wire -- because if the
 * client just does a ping (connect and hangup without sending any
 * data), our use of the pkt-line read routines will spew an error
 * message.
 *
 * Return -1 if the client hung up.
 * Return 0 if data (possibly incomplete) is ready.
 */
static int worker_thread__wait_for_io_start(
	struct ipc_worker_thread_data *worker_thread_data,
	int fd)
{
	struct ipc_server_data *server_data = worker_thread_data->server_data;
	struct pollfd pollfd[1];
	int result;

	for (;;) {
		pollfd[0].fd = fd;
		pollfd[0].events = POLLIN;

		result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
		if (result < 0) {
			if (errno == EINTR)
				continue;
			goto cleanup;
		}

		if (result == 0) {
			/* a timeout */

			int in_shutdown;

			pthread_mutex_lock(&server_data->work_available_mutex);
			in_shutdown = server_data->shutdown_requested;
			pthread_mutex_unlock(&server_data->work_available_mutex);

			/*
			 * If a shutdown is already in progress and this
			 * client has not started talking yet, just drop it.
			 */
			if (in_shutdown)
				goto cleanup;
			continue;
		}

		if (pollfd[0].revents & POLLHUP)
			goto cleanup;

		if (pollfd[0].revents & POLLIN)
			return 0;

		goto cleanup;
	}

cleanup:
	close(fd);
	return -1;
}

/*
 * Receive the request/command from the client and pass it to the
 * registered request-callback.  The request-callback will compose
 * a response and call our reply-callback to send it to the client.
 */
static int worker_thread__do_io(
	struct ipc_worker_thread_data *worker_thread_data,
	int fd)
{
	/* ASSERT NOT holding lock */

	struct strbuf buf = STRBUF_INIT;
	struct ipc_server_reply_data reply_data;
	int ret = 0;

	reply_data.magic = MAGIC_SERVER_REPLY_DATA;
	reply_data.worker_thread_data = worker_thread_data;

	reply_data.fd = fd;

	ret = read_packetized_to_strbuf(
		reply_data.fd, &buf,
		PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
	if (ret >= 0) {
		ret = worker_thread_data->server_data->application_cb(
			worker_thread_data->server_data->application_data,
			buf.buf, do_io_reply_callback, &reply_data);

		packet_flush_gently(reply_data.fd);
	}
	else {
		/*
		 * The client probably disconnected/shutdown before it
		 * could send a well-formed message.  Ignore it.
		 */
	}

	strbuf_release(&buf);
	close(reply_data.fd);

	return ret;
}

/*
 * Block SIGPIPE on the current thread (so that we get EPIPE from
 * write() rather than an actual signal).
 *
 * Note that using sigchain_push() and _pop() to control SIGPIPE
 * around our IO calls is not thread safe:
 * [] It uses a global stack of handler frames.
 * [] It uses ALLOC_GROW() to resize it.
 * [] Finally, according to the `signal(2)` man-page:
 *    "The effects of `signal()` in a multithreaded process are unspecified."
 */
static void thread_block_sigpipe(sigset_t *old_set)
{
	sigset_t new_set;

	sigemptyset(&new_set);
	sigaddset(&new_set, SIGPIPE);

	sigemptyset(old_set);
	pthread_sigmask(SIG_BLOCK, &new_set, old_set);
}

/*
 * Thread proc for an IPC worker thread.  It handles a series of
 * connections from clients.  It pulls the next fd from the queue
 * processes it, and then waits for the next client.
 *
 * Block SIGPIPE in this worker thread for the life of the thread.
 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
 * by client errors and/or when we are under extremely heavy IO load.
 *
 * This means that the application callback will have SIGPIPE blocked.
 * The callback should not change it.
 */
static void *worker_thread_proc(void *_worker_thread_data)
{
	struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
	struct ipc_server_data *server_data = worker_thread_data->server_data;
	sigset_t old_set;
	int fd, io;
	int ret;

	trace2_thread_start("ipc-worker");

	thread_block_sigpipe(&old_set);

	for (;;) {
		fd = worker_thread__wait_for_connection(worker_thread_data);
		if (fd == -1)
			break; /* in shutdown */

		io = worker_thread__wait_for_io_start(worker_thread_data, fd);
		if (io == -1)
			continue; /* client hung up without sending anything */

		ret = worker_thread__do_io(worker_thread_data, fd);

		if (ret == SIMPLE_IPC_QUIT) {
			trace2_data_string("ipc-worker", NULL, "queue_stop_async",
					   "application_quit");
			/*
			 * The application layer is telling the ipc-server
			 * layer to shutdown.
			 *
			 * We DO NOT have a response to send to the client.
			 *
			 * Queue an async stop (to stop the other threads) and
			 * allow this worker thread to exit now (no sense waiting
			 * for the thread-pool shutdown signal).
			 *
			 * Other non-idle worker threads are allowed to finish
			 * responding to their current clients.
			 */
			ipc_server_stop_async(server_data);
			break;
		}
	}

	trace2_thread_exit();
	return NULL;
}

/* A randomly chosen value. */
#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)

/*
 * Accept a new client connection on our socket.  This uses non-blocking
 * IO so that we can also wait for shutdown requests on our socket-pair
 * without actually spinning on a fast timeout.
 */
static int accept_thread__wait_for_connection(
	struct ipc_accept_thread_data *accept_thread_data)
{
	struct pollfd pollfd[2];
	int result;

	for (;;) {
		pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
		pollfd[0].events = POLLIN;

		pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
		pollfd[1].events = POLLIN;

		result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
		if (result < 0) {
			if (errno == EINTR)
				continue;
			return result;
		}

		if (result == 0) {
			/* a timeout */

			/*
			 * If someone deletes or force-creates a new unix
			 * domain socket at our path, all future clients
			 * will be routed elsewhere and we silently starve.
			 * If that happens, just queue a shutdown.
			 */
			if (unix_ss_was_stolen(
				    accept_thread_data->server_socket)) {
				trace2_data_string("ipc-accept", NULL,
						   "queue_stop_async",
						   "socket_stolen");
				ipc_server_stop_async(
					accept_thread_data->server_data);
			}
			continue;
		}

		if (pollfd[0].revents & POLLIN) {
			/* shutdown message queued to socketpair */
			return -1;
		}

		if (pollfd[1].revents & POLLIN) {
			/* a connection is available on server_socket */

			int client_fd =
				accept(accept_thread_data->server_socket->fd_socket,
				       NULL, NULL);
			if (client_fd >= 0)
				return client_fd;

			/*
			 * An error here is unlikely -- it probably
			 * indicates that the connecting process has
			 * already dropped the connection.
			 */
			continue;
		}

		BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
		    errno, pollfd[0].revents, pollfd[1].revents);
	}
}

/*
 * Thread proc for the IPC server "accept thread".  This waits for
 * an incoming socket connection, appends it to the queue of available
 * connections, and notifies a worker thread to process it.
 *
 * Block SIGPIPE in this thread for the life of the thread.  This
 * avoids any stray SIGPIPE signals when closing pipe fds under
 * extremely heavy loads (such as when the fifo queue is full and we
 * drop incomming connections).
 */
static void *accept_thread_proc(void *_accept_thread_data)
{
	struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
	struct ipc_server_data *server_data = accept_thread_data->server_data;
	sigset_t old_set;

	trace2_thread_start("ipc-accept");

	thread_block_sigpipe(&old_set);

	for (;;) {
		int client_fd = accept_thread__wait_for_connection(
			accept_thread_data);

		pthread_mutex_lock(&server_data->work_available_mutex);
		if (server_data->shutdown_requested) {
			pthread_mutex_unlock(&server_data->work_available_mutex);
			if (client_fd >= 0)
				close(client_fd);
			break;
		}

		if (client_fd < 0) {
			/* ignore transient accept() errors */
		}
		else {
			fifo_enqueue(server_data, client_fd);
			pthread_cond_broadcast(&server_data->work_available_cond);
		}
		pthread_mutex_unlock(&server_data->work_available_mutex);
	}

	trace2_thread_exit();
	return NULL;
}

/*
 * We can't predict the connection arrival rate relative to the worker
 * processing rate, therefore we allow the "accept-thread" to queue up
 * a generous number of connections, since we'd rather have the client
 * not unnecessarily timeout if we can avoid it.  (The assumption is
 * that this will be used for FSMonitor and a few second wait on a
 * connection is better than having the client timeout and do the full
 * computation itself.)
 *
 * The FIFO queue size is set to a multiple of the worker pool size.
 * This value chosen at random.
 */
#define FIFO_SCALE (100)

/*
 * The backlog value for `listen(2)`.  This doesn't need to huge,
 * rather just large enough for our "accept-thread" to wake up and
 * queue incoming connections onto the FIFO without the kernel
 * dropping any.
 *
 * This value chosen at random.
 */
#define LISTEN_BACKLOG (50)

static int create_listener_socket(
	const char *path,
	const struct ipc_server_opts *ipc_opts,
	struct unix_ss_socket **new_server_socket)
{
	struct unix_ss_socket *server_socket = NULL;
	struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
	int ret;

	uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
	uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;

	ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
	if (ret)
		return ret;

	if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
		int saved_errno = errno;
		unix_ss_free(server_socket);
		errno = saved_errno;
		return -1;
	}

	*new_server_socket = server_socket;

	trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
	return 0;
}

static int setup_listener_socket(
	const char *path,
	const struct ipc_server_opts *ipc_opts,
	struct unix_ss_socket **new_server_socket)
{
	int ret, saved_errno;

	trace2_region_enter("ipc-server", "create-listener_socket", NULL);

	ret = create_listener_socket(path, ipc_opts, new_server_socket);

	saved_errno = errno;
	trace2_region_leave("ipc-server", "create-listener_socket", NULL);
	errno = saved_errno;

	return ret;
}

/*
 * Start IPC server in a pool of background threads.
 */
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
			 const char *path, const struct ipc_server_opts *opts,
			 ipc_server_application_cb *application_cb,
			 void *application_data)
{
	struct unix_ss_socket *server_socket = NULL;
	struct ipc_server_data *server_data;
	int sv[2];
	int k;
	int ret;
	int nr_threads = opts->nr_threads;

	*returned_server_data = NULL;

	/*
	 * Create a socketpair and set sv[1] to non-blocking.  This
	 * will used to send a shutdown message to the accept-thread
	 * and allows the accept-thread to wait on EITHER a client
	 * connection or a shutdown request without spinning.
	 */
	if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
		return -1;

	if (set_socket_blocking_flag(sv[1], 1)) {
		int saved_errno = errno;
		close(sv[0]);
		close(sv[1]);
		errno = saved_errno;
		return -1;
	}

	ret = setup_listener_socket(path, opts, &server_socket);
	if (ret) {
		int saved_errno = errno;
		close(sv[0]);
		close(sv[1]);
		errno = saved_errno;
		return ret;
	}

	server_data = xcalloc(1, sizeof(*server_data));
	server_data->magic = MAGIC_SERVER_DATA;
	server_data->application_cb = application_cb;
	server_data->application_data = application_data;
	strbuf_init(&server_data->buf_path, 0);
	strbuf_addstr(&server_data->buf_path, path);

	if (nr_threads < 1)
		nr_threads = 1;

	pthread_mutex_init(&server_data->work_available_mutex, NULL);
	pthread_cond_init(&server_data->work_available_cond, NULL);

	server_data->queue_size = nr_threads * FIFO_SCALE;
	CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);

	server_data->accept_thread =
		xcalloc(1, sizeof(*server_data->accept_thread));
	server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
	server_data->accept_thread->server_data = server_data;
	server_data->accept_thread->server_socket = server_socket;
	server_data->accept_thread->fd_send_shutdown = sv[0];
	server_data->accept_thread->fd_wait_shutdown = sv[1];

	if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
			   accept_thread_proc, server_data->accept_thread))
		die_errno(_("could not start accept_thread '%s'"), path);

	for (k = 0; k < nr_threads; k++) {
		struct ipc_worker_thread_data *wtd;

		wtd = xcalloc(1, sizeof(*wtd));
		wtd->magic = MAGIC_WORKER_THREAD_DATA;
		wtd->server_data = server_data;

		if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
				   wtd)) {
			if (k == 0)
				die(_("could not start worker[0] for '%s'"),
				    path);
			/*
			 * Limp along with the thread pool that we have.
			 */
			break;
		}

		wtd->next_thread = server_data->worker_thread_list;
		server_data->worker_thread_list = wtd;
	}

	*returned_server_data = server_data;
	return 0;
}

/*
 * Gently tell the IPC server treads to shutdown.
 * Can be run on any thread.
 */
int ipc_server_stop_async(struct ipc_server_data *server_data)
{
	/* ASSERT NOT holding mutex */

	int fd;

	if (!server_data)
		return 0;

	trace2_region_enter("ipc-server", "server-stop-async", NULL);

	pthread_mutex_lock(&server_data->work_available_mutex);

	server_data->shutdown_requested = 1;

	/*
	 * Write a byte to the shutdown socket pair to wake up the
	 * accept-thread.
	 */
	if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
		error_errno("could not write to fd_send_shutdown");

	/*
	 * Drain the queue of existing connections.
	 */
	while ((fd = fifo_dequeue(server_data)) != -1)
		close(fd);

	/*
	 * Gently tell worker threads to stop processing new connections
	 * and exit.  (This does not abort in-process conversations.)
	 */
	pthread_cond_broadcast(&server_data->work_available_cond);

	pthread_mutex_unlock(&server_data->work_available_mutex);

	trace2_region_leave("ipc-server", "server-stop-async", NULL);

	return 0;
}

/*
 * Wait for all IPC server threads to stop.
 */
int ipc_server_await(struct ipc_server_data *server_data)
{
	pthread_join(server_data->accept_thread->pthread_id, NULL);

	if (!server_data->shutdown_requested)
		BUG("ipc-server: accept-thread stopped for '%s'",
		    server_data->buf_path.buf);

	while (server_data->worker_thread_list) {
		struct ipc_worker_thread_data *wtd =
			server_data->worker_thread_list;

		pthread_join(wtd->pthread_id, NULL);

		server_data->worker_thread_list = wtd->next_thread;
		free(wtd);
	}

	server_data->is_stopped = 1;

	return 0;
}

void ipc_server_free(struct ipc_server_data *server_data)
{
	struct ipc_accept_thread_data * accept_thread_data;

	if (!server_data)
		return;

	if (!server_data->is_stopped)
		BUG("cannot free ipc-server while running for '%s'",
		    server_data->buf_path.buf);

	accept_thread_data = server_data->accept_thread;
	if (accept_thread_data) {
		unix_ss_free(accept_thread_data->server_socket);

		if (accept_thread_data->fd_send_shutdown != -1)
			close(accept_thread_data->fd_send_shutdown);
		if (accept_thread_data->fd_wait_shutdown != -1)
			close(accept_thread_data->fd_wait_shutdown);

		free(server_data->accept_thread);
	}

	while (server_data->worker_thread_list) {
		struct ipc_worker_thread_data *wtd =
			server_data->worker_thread_list;

		server_data->worker_thread_list = wtd->next_thread;
		free(wtd);
	}

	pthread_cond_destroy(&server_data->work_available_cond);
	pthread_mutex_destroy(&server_data->work_available_mutex);

	strbuf_release(&server_data->buf_path);

	free(server_data->fifo_fds);
	free(server_data);
}