diff mbox series

[v7,18/29] fsmonitor--daemon: implement handle_client callback

Message ID 1f4b5209bf6092dc389db4046e0bdedc61cc4581.1647972010.git.gitgitgadget@gmail.com (mailing list archive)
State Superseded
Headers show
Series Builtin FSMonitor Part 2 | expand

Commit Message

Jeff Hostetler March 22, 2022, 5:59 p.m. UTC
From: Jeff Hostetler <jeffhost@microsoft.com>

Teach fsmonitor--daemon to respond to IPC requests from client
Git processes and respond with a list of modified pathnames
relative to the provided token.

Signed-off-by: Jeff Hostetler <jeffhost@microsoft.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 builtin/fsmonitor--daemon.c | 312 +++++++++++++++++++++++++++++++++++-
 1 file changed, 310 insertions(+), 2 deletions(-)

Comments

Ævar Arnfjörð Bjarmason March 22, 2022, 6:30 p.m. UTC | #1
On Tue, Mar 22 2022, Jeff Hostetler via GitGitGadget wrote:

> From: Jeff Hostetler <jeffhost@microsoft.com>
> [...]
> +static int do_handle_client(struct fsmonitor_daemon_state *state,
> +			    const char *command,
> +			    ipc_server_reply_cb *reply,
> +			    struct ipc_server_reply_data *reply_data)
> +{
> +	struct fsmonitor_token_data *token_data = NULL;
> +	struct strbuf response_token = STRBUF_INIT;
> +	struct strbuf requested_token_id = STRBUF_INIT;
> +	struct strbuf payload = STRBUF_INIT;
> +	uint64_t requested_oldest_seq_nr = 0;
> +	uint64_t total_response_len = 0;
> +	const char *p;
> +	const struct fsmonitor_batch *batch_head;
> +	const struct fsmonitor_batch *batch;
> +	intmax_t count = 0, duplicates = 0;
> +	kh_str_t *shown;
> +	int hash_ret;
> +	int do_trivial = 0;
> +	int do_flush = 0;

Just noticed while skimming, isn't initializing do_trivial here (didn't
exhaustively check the others) getting in the way of uninitialized
analysis the compiler will do...

> +
> +	/*
> +	 * We expect `command` to be of the form:
> +	 *
> +	 * <command> := quit NUL
> +	 *            | flush NUL
> +	 *            | <V1-time-since-epoch-ns> NUL
> +	 *            | <V2-opaque-fsmonitor-token> NUL
> +	 */
> +
> +	if (!strcmp(command, "quit")) {
> +		/*
> +		 * A client has requested over the socket/pipe that the
> +		 * daemon shutdown.
> +		 *
> +		 * Tell the IPC thread pool to shutdown (which completes
> +		 * the await in the main thread (which can stop the
> +		 * fsmonitor listener thread)).
> +		 *
> +		 * There is no reply to the client.
> +		 */
> +		return SIMPLE_IPC_QUIT;
> +
> +	} else if (!strcmp(command, "flush")) {
> +		/*
> +		 * Flush all of our cached data and generate a new token
> +		 * just like if we lost sync with the filesystem.
> +		 *
> +		 * Then send a trivial response using the new token.
> +		 */
> +		do_flush = 1;
> +		do_trivial = 1;
> +
> +	} else if (!skip_prefix(command, "builtin:", &p)) {
> +		/* assume V1 timestamp or garbage */
> +
> +		char *p_end;
> +
> +		strtoumax(command, &p_end, 10);
> +		trace_printf_key(&trace_fsmonitor,
> +				 ((*p_end) ?
> +				  "fsmonitor: invalid command line '%s'" :
> +				  "fsmonitor: unsupported V1 protocol '%s'"),
> +				 command);
> +		do_trivial = 1;
> +
> +	} else {
> +		/* We have "builtin:*" */
> +		if (fsmonitor_parse_client_token(command, &requested_token_id,
> +						 &requested_oldest_seq_nr)) {
> +			trace_printf_key(&trace_fsmonitor,
> +					 "fsmonitor: invalid V2 protocol token '%s'",
> +					 command);
> +			do_trivial = 1;
> +
> +		} else {
> +			/*
> +			 * We have a V2 valid token:
> +			 *     "builtin:<token_id>:<seq_nr>"
> +			 */
> +		}
> +	}

Since we'll set it here in all branches except the "else" branch,
i.e. if you move this to "we have a v2 valid" we'll catch future bugs if
we ever have it uninitialized still, 

> +
> +	pthread_mutex_lock(&state->main_lock);
> +
> +	if (!state->current_token_data)
> +		BUG("fsmonitor state does not have a current token");
> +
> +	if (do_flush)
> +		with_lock__do_force_resync(state);
> +
> +	/*
> +	 * We mark the current head of the batch list as "pinned" so
> +	 * that the listener thread will treat this item as read-only
> +	 * (and prevent any more paths from being added to it) from
> +	 * now on.
> +	 */
> +	token_data = state->current_token_data;
> +	batch_head = token_data->batch_head;
> +	((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL);
> +
> +	/*
> +	 * FSMonitor Protocol V2 requires that we send a response header
> +	 * with a "new current token" and then all of the paths that changed
> +	 * since the "requested token".  We send the seq_nr of the just-pinned
> +	 * head batch so that future requests from a client will be relative
> +	 * to it.
> +	 */
> +	with_lock__format_response_token(&response_token,
> +					 &token_data->token_id, batch_head);
> +
> +	reply(reply_data, response_token.buf, response_token.len + 1);
> +	total_response_len += response_token.len + 1;
> +
> +	trace2_data_string("fsmonitor", the_repository, "response/token",
> +			   response_token.buf);
> +	trace_printf_key(&trace_fsmonitor, "response token: %s",
> +			 response_token.buf);
> +
> +	if (!do_trivial) {

I.e. this would start warning.

> +		if (strcmp(requested_token_id.buf, token_data->token_id.buf)) {
> +			/*
> +			 * The client last spoke to a different daemon
> +			 * instance -OR- the daemon had to resync with
> +			 * the filesystem (and lost events), so reject.
> +			 */
> +			trace2_data_string("fsmonitor", the_repository,
> +					   "response/token", "different");
> +			do_trivial = 1;
> +
> +		} else if (requested_oldest_seq_nr <
> +			   token_data->batch_tail->batch_seq_nr) {
> +			/*
> +			 * The client wants older events than we have for
> +			 * this token_id.  This means that the end of our
> +			 * batch list was truncated and we cannot give the
> +			 * client a complete snapshot relative to their
> +			 * request.
> +			 */
> +			trace_printf_key(&trace_fsmonitor,
> +					 "client requested truncated data");
> +			do_trivial = 1;
> +		}
> +	}
> +
> +	if (do_trivial) {
> +		pthread_mutex_unlock(&state->main_lock);
> +
> +		reply(reply_data, "/", 2);
> +
> +		trace2_data_intmax("fsmonitor", the_repository,
> +				   "response/trivial", 1);
> +
> +		strbuf_release(&response_token);
> +		strbuf_release(&requested_token_id);
> +		return 0;

Nit: instead of strbuf_release() here, just a 'goto cleanup' and ...

> +	}
> +
> +	/*
> +	 * We're going to hold onto a pointer to the current
> +	 * token-data while we walk the list of batches of files.
> +	 * During this time, we will NOT be under the lock.
> +	 * So we ref-count it.
> +	 *
> +	 * This allows the listener thread to continue prepending
> +	 * new batches of items to the token-data (which we'll ignore).
> +	 *
> +	 * AND it allows the listener thread to do a token-reset
> +	 * (and install a new `current_token_data`).
> +	 */
> +	token_data->client_ref_count++;
> +
> +	pthread_mutex_unlock(&state->main_lock);
> +
> +	/*
> +	 * The client request is relative to the token that they sent,
> +	 * so walk the batch list backwards from the current head back
> +	 * to the batch (sequence number) they named.
> +	 *
> +	 * We use khash to de-dup the list of pathnames.
> +	 *
> +	 * NEEDSWORK: each batch contains a list of interned strings,
> +	 * so we only need to do pointer comparisons here to build the
> +	 * hash table.  Currently, we're still comparing the string
> +	 * values.
> +	 */
> +	shown = kh_init_str();
> +	for (batch = batch_head;
> +	     batch && batch->batch_seq_nr > requested_oldest_seq_nr;
> +	     batch = batch->next) {
> +		size_t k;
> +
> +		for (k = 0; k < batch->nr; k++) {
> +			const char *s = batch->interned_paths[k];
> +			size_t s_len;
> +
> +			if (kh_get_str(shown, s) != kh_end(shown))
> +				duplicates++;
> +			else {
> +				kh_put_str(shown, s, &hash_ret);
> +
> +				trace_printf_key(&trace_fsmonitor,
> +						 "send[%"PRIuMAX"]: %s",
> +						 count, s);
> +
> +				/* Each path gets written with a trailing NUL */
> +				s_len = strlen(s) + 1;
> +
> +				if (payload.len + s_len >=
> +				    LARGE_PACKET_DATA_MAX) {
> +					reply(reply_data, payload.buf,
> +					      payload.len);
> +					total_response_len += payload.len;
> +					strbuf_reset(&payload);
> +				}
> +
> +				strbuf_add(&payload, s, s_len);
> +				count++;
> +			}
> +		}
> +	}
> +
> +	if (payload.len) {
> +		reply(reply_data, payload.buf, payload.len);
> +		total_response_len += payload.len;
> +	}
> +
> +	kh_release_str(shown);
> +
> +	pthread_mutex_lock(&state->main_lock);
> +
> +	if (token_data->client_ref_count > 0)
> +		token_data->client_ref_count--;
> +
> +	if (token_data->client_ref_count == 0) {
> +		if (token_data != state->current_token_data) {
> +			/*
> +			 * The listener thread did a token-reset while we were
> +			 * walking the batch list.  Therefore, this token is
> +			 * stale and can be discarded completely.  If we are
> +			 * the last reader thread using this token, we own
> +			 * that work.
> +			 */
> +			fsmonitor_free_token_data(token_data);
> +		}
> +	}
> +
> +	pthread_mutex_unlock(&state->main_lock);
> +
> +	trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len);
> +	trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count);
> +	trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates);
> +

add a "cleanup" label here?

> +	strbuf_release(&response_token);
> +	strbuf_release(&requested_token_id);
> +	strbuf_release(&payload);
> +
> +	return 0;
> +}
> +
>  static ipc_server_application_cb handle_client;
Jeff Hostetler March 23, 2022, 2:45 p.m. UTC | #2
On 3/22/22 2:30 PM, Ævar Arnfjörð Bjarmason wrote:
> 
> On Tue, Mar 22 2022, Jeff Hostetler via GitGitGadget wrote:
> 
>> From: Jeff Hostetler <jeffhost@microsoft.com>
>> [...]
>> +static int do_handle_client(struct fsmonitor_daemon_state *state,
>> +			    const char *command,
>> +			    ipc_server_reply_cb *reply,
>> +			    struct ipc_server_reply_data *reply_data)
>> +{
>> +	struct fsmonitor_token_data *token_data = NULL;
>> +	struct strbuf response_token = STRBUF_INIT;
>> +	struct strbuf requested_token_id = STRBUF_INIT;
>> +	struct strbuf payload = STRBUF_INIT;
>> +	uint64_t requested_oldest_seq_nr = 0;
>> +	uint64_t total_response_len = 0;
>> +	const char *p;
>> +	const struct fsmonitor_batch *batch_head;
>> +	const struct fsmonitor_batch *batch;
>> +	intmax_t count = 0, duplicates = 0;
>> +	kh_str_t *shown;
>> +	int hash_ret;
>> +	int do_trivial = 0;
>> +	int do_flush = 0;
> 
> Just noticed while skimming, isn't initializing do_trivial here (didn't
> exhaustively check the others) getting in the way of uninitialized
> analysis the compiler will do...
> 
>> +
>> +	/*
>> +	 * We expect `command` to be of the form:
>> +	 *
>> +	 * <command> := quit NUL
>> +	 *            | flush NUL
>> +	 *            | <V1-time-since-epoch-ns> NUL
>> +	 *            | <V2-opaque-fsmonitor-token> NUL
>> +	 */
>> +
>> +	if (!strcmp(command, "quit")) {
>> +		/*
>> +		 * A client has requested over the socket/pipe that the
>> +		 * daemon shutdown.
>> +		 *
>> +		 * Tell the IPC thread pool to shutdown (which completes
>> +		 * the await in the main thread (which can stop the
>> +		 * fsmonitor listener thread)).
>> +		 *
>> +		 * There is no reply to the client.
>> +		 */
>> +		return SIMPLE_IPC_QUIT;
>> +
>> +	} else if (!strcmp(command, "flush")) {
>> +		/*
>> +		 * Flush all of our cached data and generate a new token
>> +		 * just like if we lost sync with the filesystem.
>> +		 *
>> +		 * Then send a trivial response using the new token.
>> +		 */
>> +		do_flush = 1;
>> +		do_trivial = 1;
>> +
>> +	} else if (!skip_prefix(command, "builtin:", &p)) {
>> +		/* assume V1 timestamp or garbage */
>> +
>> +		char *p_end;
>> +
>> +		strtoumax(command, &p_end, 10);
>> +		trace_printf_key(&trace_fsmonitor,
>> +				 ((*p_end) ?
>> +				  "fsmonitor: invalid command line '%s'" :
>> +				  "fsmonitor: unsupported V1 protocol '%s'"),
>> +				 command);
>> +		do_trivial = 1;
>> +
>> +	} else {
>> +		/* We have "builtin:*" */
>> +		if (fsmonitor_parse_client_token(command, &requested_token_id,
>> +						 &requested_oldest_seq_nr)) {
>> +			trace_printf_key(&trace_fsmonitor,
>> +					 "fsmonitor: invalid V2 protocol token '%s'",
>> +					 command);
>> +			do_trivial = 1;
>> +
>> +		} else {
>> +			/*
>> +			 * We have a V2 valid token:
>> +			 *     "builtin:<token_id>:<seq_nr>"
>> +			 */
>> +		}
>> +	}
> 
> Since we'll set it here in all branches except the "else" branch,
> i.e. if you move this to "we have a v2 valid" we'll catch future bugs if
> we ever have it uninitialized still,

I suppose we could do it that way.  We'd want to do the same
for "do_flush" in that case.  I could go either way on this
change.


> 
>> +
>> +	pthread_mutex_lock(&state->main_lock);
>> +
>> +	if (!state->current_token_data)
>> +		BUG("fsmonitor state does not have a current token");
>> +
>> +	if (do_flush)
>> +		with_lock__do_force_resync(state);
>> +
>> +	/*
>> +	 * We mark the current head of the batch list as "pinned" so
>> +	 * that the listener thread will treat this item as read-only
>> +	 * (and prevent any more paths from being added to it) from
>> +	 * now on.
>> +	 */
>> +	token_data = state->current_token_data;
>> +	batch_head = token_data->batch_head;
>> +	((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL);
>> +
>> +	/*
>> +	 * FSMonitor Protocol V2 requires that we send a response header
>> +	 * with a "new current token" and then all of the paths that changed
>> +	 * since the "requested token".  We send the seq_nr of the just-pinned
>> +	 * head batch so that future requests from a client will be relative
>> +	 * to it.
>> +	 */
>> +	with_lock__format_response_token(&response_token,
>> +					 &token_data->token_id, batch_head);
>> +
>> +	reply(reply_data, response_token.buf, response_token.len + 1);
>> +	total_response_len += response_token.len + 1;
>> +
>> +	trace2_data_string("fsmonitor", the_repository, "response/token",
>> +			   response_token.buf);
>> +	trace_printf_key(&trace_fsmonitor, "response token: %s",
>> +			 response_token.buf);
>> +
>> +	if (!do_trivial) {
> 
> I.e. this would start warning.
> 
>> +		if (strcmp(requested_token_id.buf, token_data->token_id.buf)) {
>> +			/*
>> +			 * The client last spoke to a different daemon
>> +			 * instance -OR- the daemon had to resync with
>> +			 * the filesystem (and lost events), so reject.
>> +			 */
>> +			trace2_data_string("fsmonitor", the_repository,
>> +					   "response/token", "different");
>> +			do_trivial = 1;
>> +
>> +		} else if (requested_oldest_seq_nr <
>> +			   token_data->batch_tail->batch_seq_nr) {
>> +			/*
>> +			 * The client wants older events than we have for
>> +			 * this token_id.  This means that the end of our
>> +			 * batch list was truncated and we cannot give the
>> +			 * client a complete snapshot relative to their
>> +			 * request.
>> +			 */
>> +			trace_printf_key(&trace_fsmonitor,
>> +					 "client requested truncated data");
>> +			do_trivial = 1;
>> +		}
>> +	}
>> +
>> +	if (do_trivial) {
>> +		pthread_mutex_unlock(&state->main_lock);
>> +
>> +		reply(reply_data, "/", 2);
>> +
>> +		trace2_data_intmax("fsmonitor", the_repository,
>> +				   "response/trivial", 1);
>> +
>> +		strbuf_release(&response_token);
>> +		strbuf_release(&requested_token_id);
>> +		return 0;
> 
> Nit: instead of strbuf_release() here, just a 'goto cleanup' and ...

Yeah, we could do that.  That would also do an unnecesssary release
of "payload", but that's not that bad (or I could order them at the
bottom).

I'm not sure either of these changes are enough for a re-roll,
but I'll include them if I do.

Thanks
Jeff

> 
>> +	}
>> +
>> +	/*
>> +	 * We're going to hold onto a pointer to the current
>> +	 * token-data while we walk the list of batches of files.
>> +	 * During this time, we will NOT be under the lock.
>> +	 * So we ref-count it.
>> +	 *
>> +	 * This allows the listener thread to continue prepending
>> +	 * new batches of items to the token-data (which we'll ignore).
>> +	 *
>> +	 * AND it allows the listener thread to do a token-reset
>> +	 * (and install a new `current_token_data`).
>> +	 */
>> +	token_data->client_ref_count++;
>> +
>> +	pthread_mutex_unlock(&state->main_lock);
>> +
>> +	/*
>> +	 * The client request is relative to the token that they sent,
>> +	 * so walk the batch list backwards from the current head back
>> +	 * to the batch (sequence number) they named.
>> +	 *
>> +	 * We use khash to de-dup the list of pathnames.
>> +	 *
>> +	 * NEEDSWORK: each batch contains a list of interned strings,
>> +	 * so we only need to do pointer comparisons here to build the
>> +	 * hash table.  Currently, we're still comparing the string
>> +	 * values.
>> +	 */
>> +	shown = kh_init_str();
>> +	for (batch = batch_head;
>> +	     batch && batch->batch_seq_nr > requested_oldest_seq_nr;
>> +	     batch = batch->next) {
>> +		size_t k;
>> +
>> +		for (k = 0; k < batch->nr; k++) {
>> +			const char *s = batch->interned_paths[k];
>> +			size_t s_len;
>> +
>> +			if (kh_get_str(shown, s) != kh_end(shown))
>> +				duplicates++;
>> +			else {
>> +				kh_put_str(shown, s, &hash_ret);
>> +
>> +				trace_printf_key(&trace_fsmonitor,
>> +						 "send[%"PRIuMAX"]: %s",
>> +						 count, s);
>> +
>> +				/* Each path gets written with a trailing NUL */
>> +				s_len = strlen(s) + 1;
>> +
>> +				if (payload.len + s_len >=
>> +				    LARGE_PACKET_DATA_MAX) {
>> +					reply(reply_data, payload.buf,
>> +					      payload.len);
>> +					total_response_len += payload.len;
>> +					strbuf_reset(&payload);
>> +				}
>> +
>> +				strbuf_add(&payload, s, s_len);
>> +				count++;
>> +			}
>> +		}
>> +	}
>> +
>> +	if (payload.len) {
>> +		reply(reply_data, payload.buf, payload.len);
>> +		total_response_len += payload.len;
>> +	}
>> +
>> +	kh_release_str(shown);
>> +
>> +	pthread_mutex_lock(&state->main_lock);
>> +
>> +	if (token_data->client_ref_count > 0)
>> +		token_data->client_ref_count--;
>> +
>> +	if (token_data->client_ref_count == 0) {
>> +		if (token_data != state->current_token_data) {
>> +			/*
>> +			 * The listener thread did a token-reset while we were
>> +			 * walking the batch list.  Therefore, this token is
>> +			 * stale and can be discarded completely.  If we are
>> +			 * the last reader thread using this token, we own
>> +			 * that work.
>> +			 */
>> +			fsmonitor_free_token_data(token_data);
>> +		}
>> +	}
>> +
>> +	pthread_mutex_unlock(&state->main_lock);
>> +
>> +	trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len);
>> +	trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count);
>> +	trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates);
>> +
> 
> add a "cleanup" label here?
> 
>> +	strbuf_release(&response_token);
>> +	strbuf_release(&requested_token_id);
>> +	strbuf_release(&payload);
>> +
>> +	return 0;
>> +}
>> +
>>   static ipc_server_application_cb handle_client;
diff mbox series

Patch

diff --git a/builtin/fsmonitor--daemon.c b/builtin/fsmonitor--daemon.c
index 69312119b07..fc3aee0ada0 100644
--- a/builtin/fsmonitor--daemon.c
+++ b/builtin/fsmonitor--daemon.c
@@ -7,6 +7,7 @@ 
 #include "fsmonitor--daemon.h"
 #include "simple-ipc.h"
 #include "khash.h"
+#include "pkt-line.h"
 
 static const char * const builtin_fsmonitor__daemon_usage[] = {
 	N_("git fsmonitor--daemon start [<options>]"),
@@ -364,6 +365,311 @@  void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
 	pthread_mutex_unlock(&state->main_lock);
 }
 
+/*
+ * Format an opaque token string to send to the client.
+ */
+static void with_lock__format_response_token(
+	struct strbuf *response_token,
+	const struct strbuf *response_token_id,
+	const struct fsmonitor_batch *batch)
+{
+	/* assert current thread holding state->main_lock */
+
+	strbuf_reset(response_token);
+	strbuf_addf(response_token, "builtin:%s:%"PRIu64,
+		    response_token_id->buf, batch->batch_seq_nr);
+}
+
+/*
+ * Parse an opaque token from the client.
+ * Returns -1 on error.
+ */
+static int fsmonitor_parse_client_token(const char *buf_token,
+					struct strbuf *requested_token_id,
+					uint64_t *seq_nr)
+{
+	const char *p;
+	char *p_end;
+
+	strbuf_reset(requested_token_id);
+	*seq_nr = 0;
+
+	if (!skip_prefix(buf_token, "builtin:", &p))
+		return -1;
+
+	while (*p && *p != ':')
+		strbuf_addch(requested_token_id, *p++);
+	if (!*p++)
+		return -1;
+
+	*seq_nr = (uint64_t)strtoumax(p, &p_end, 10);
+	if (*p_end)
+		return -1;
+
+	return 0;
+}
+
+KHASH_INIT(str, const char *, int, 0, kh_str_hash_func, kh_str_hash_equal)
+
+static int do_handle_client(struct fsmonitor_daemon_state *state,
+			    const char *command,
+			    ipc_server_reply_cb *reply,
+			    struct ipc_server_reply_data *reply_data)
+{
+	struct fsmonitor_token_data *token_data = NULL;
+	struct strbuf response_token = STRBUF_INIT;
+	struct strbuf requested_token_id = STRBUF_INIT;
+	struct strbuf payload = STRBUF_INIT;
+	uint64_t requested_oldest_seq_nr = 0;
+	uint64_t total_response_len = 0;
+	const char *p;
+	const struct fsmonitor_batch *batch_head;
+	const struct fsmonitor_batch *batch;
+	intmax_t count = 0, duplicates = 0;
+	kh_str_t *shown;
+	int hash_ret;
+	int do_trivial = 0;
+	int do_flush = 0;
+
+	/*
+	 * We expect `command` to be of the form:
+	 *
+	 * <command> := quit NUL
+	 *            | flush NUL
+	 *            | <V1-time-since-epoch-ns> NUL
+	 *            | <V2-opaque-fsmonitor-token> NUL
+	 */
+
+	if (!strcmp(command, "quit")) {
+		/*
+		 * A client has requested over the socket/pipe that the
+		 * daemon shutdown.
+		 *
+		 * Tell the IPC thread pool to shutdown (which completes
+		 * the await in the main thread (which can stop the
+		 * fsmonitor listener thread)).
+		 *
+		 * There is no reply to the client.
+		 */
+		return SIMPLE_IPC_QUIT;
+
+	} else if (!strcmp(command, "flush")) {
+		/*
+		 * Flush all of our cached data and generate a new token
+		 * just like if we lost sync with the filesystem.
+		 *
+		 * Then send a trivial response using the new token.
+		 */
+		do_flush = 1;
+		do_trivial = 1;
+
+	} else if (!skip_prefix(command, "builtin:", &p)) {
+		/* assume V1 timestamp or garbage */
+
+		char *p_end;
+
+		strtoumax(command, &p_end, 10);
+		trace_printf_key(&trace_fsmonitor,
+				 ((*p_end) ?
+				  "fsmonitor: invalid command line '%s'" :
+				  "fsmonitor: unsupported V1 protocol '%s'"),
+				 command);
+		do_trivial = 1;
+
+	} else {
+		/* We have "builtin:*" */
+		if (fsmonitor_parse_client_token(command, &requested_token_id,
+						 &requested_oldest_seq_nr)) {
+			trace_printf_key(&trace_fsmonitor,
+					 "fsmonitor: invalid V2 protocol token '%s'",
+					 command);
+			do_trivial = 1;
+
+		} else {
+			/*
+			 * We have a V2 valid token:
+			 *     "builtin:<token_id>:<seq_nr>"
+			 */
+		}
+	}
+
+	pthread_mutex_lock(&state->main_lock);
+
+	if (!state->current_token_data)
+		BUG("fsmonitor state does not have a current token");
+
+	if (do_flush)
+		with_lock__do_force_resync(state);
+
+	/*
+	 * We mark the current head of the batch list as "pinned" so
+	 * that the listener thread will treat this item as read-only
+	 * (and prevent any more paths from being added to it) from
+	 * now on.
+	 */
+	token_data = state->current_token_data;
+	batch_head = token_data->batch_head;
+	((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL);
+
+	/*
+	 * FSMonitor Protocol V2 requires that we send a response header
+	 * with a "new current token" and then all of the paths that changed
+	 * since the "requested token".  We send the seq_nr of the just-pinned
+	 * head batch so that future requests from a client will be relative
+	 * to it.
+	 */
+	with_lock__format_response_token(&response_token,
+					 &token_data->token_id, batch_head);
+
+	reply(reply_data, response_token.buf, response_token.len + 1);
+	total_response_len += response_token.len + 1;
+
+	trace2_data_string("fsmonitor", the_repository, "response/token",
+			   response_token.buf);
+	trace_printf_key(&trace_fsmonitor, "response token: %s",
+			 response_token.buf);
+
+	if (!do_trivial) {
+		if (strcmp(requested_token_id.buf, token_data->token_id.buf)) {
+			/*
+			 * The client last spoke to a different daemon
+			 * instance -OR- the daemon had to resync with
+			 * the filesystem (and lost events), so reject.
+			 */
+			trace2_data_string("fsmonitor", the_repository,
+					   "response/token", "different");
+			do_trivial = 1;
+
+		} else if (requested_oldest_seq_nr <
+			   token_data->batch_tail->batch_seq_nr) {
+			/*
+			 * The client wants older events than we have for
+			 * this token_id.  This means that the end of our
+			 * batch list was truncated and we cannot give the
+			 * client a complete snapshot relative to their
+			 * request.
+			 */
+			trace_printf_key(&trace_fsmonitor,
+					 "client requested truncated data");
+			do_trivial = 1;
+		}
+	}
+
+	if (do_trivial) {
+		pthread_mutex_unlock(&state->main_lock);
+
+		reply(reply_data, "/", 2);
+
+		trace2_data_intmax("fsmonitor", the_repository,
+				   "response/trivial", 1);
+
+		strbuf_release(&response_token);
+		strbuf_release(&requested_token_id);
+		return 0;
+	}
+
+	/*
+	 * We're going to hold onto a pointer to the current
+	 * token-data while we walk the list of batches of files.
+	 * During this time, we will NOT be under the lock.
+	 * So we ref-count it.
+	 *
+	 * This allows the listener thread to continue prepending
+	 * new batches of items to the token-data (which we'll ignore).
+	 *
+	 * AND it allows the listener thread to do a token-reset
+	 * (and install a new `current_token_data`).
+	 */
+	token_data->client_ref_count++;
+
+	pthread_mutex_unlock(&state->main_lock);
+
+	/*
+	 * The client request is relative to the token that they sent,
+	 * so walk the batch list backwards from the current head back
+	 * to the batch (sequence number) they named.
+	 *
+	 * We use khash to de-dup the list of pathnames.
+	 *
+	 * NEEDSWORK: each batch contains a list of interned strings,
+	 * so we only need to do pointer comparisons here to build the
+	 * hash table.  Currently, we're still comparing the string
+	 * values.
+	 */
+	shown = kh_init_str();
+	for (batch = batch_head;
+	     batch && batch->batch_seq_nr > requested_oldest_seq_nr;
+	     batch = batch->next) {
+		size_t k;
+
+		for (k = 0; k < batch->nr; k++) {
+			const char *s = batch->interned_paths[k];
+			size_t s_len;
+
+			if (kh_get_str(shown, s) != kh_end(shown))
+				duplicates++;
+			else {
+				kh_put_str(shown, s, &hash_ret);
+
+				trace_printf_key(&trace_fsmonitor,
+						 "send[%"PRIuMAX"]: %s",
+						 count, s);
+
+				/* Each path gets written with a trailing NUL */
+				s_len = strlen(s) + 1;
+
+				if (payload.len + s_len >=
+				    LARGE_PACKET_DATA_MAX) {
+					reply(reply_data, payload.buf,
+					      payload.len);
+					total_response_len += payload.len;
+					strbuf_reset(&payload);
+				}
+
+				strbuf_add(&payload, s, s_len);
+				count++;
+			}
+		}
+	}
+
+	if (payload.len) {
+		reply(reply_data, payload.buf, payload.len);
+		total_response_len += payload.len;
+	}
+
+	kh_release_str(shown);
+
+	pthread_mutex_lock(&state->main_lock);
+
+	if (token_data->client_ref_count > 0)
+		token_data->client_ref_count--;
+
+	if (token_data->client_ref_count == 0) {
+		if (token_data != state->current_token_data) {
+			/*
+			 * The listener thread did a token-reset while we were
+			 * walking the batch list.  Therefore, this token is
+			 * stale and can be discarded completely.  If we are
+			 * the last reader thread using this token, we own
+			 * that work.
+			 */
+			fsmonitor_free_token_data(token_data);
+		}
+	}
+
+	pthread_mutex_unlock(&state->main_lock);
+
+	trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len);
+	trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count);
+	trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates);
+
+	strbuf_release(&response_token);
+	strbuf_release(&requested_token_id);
+	strbuf_release(&payload);
+
+	return 0;
+}
+
 static ipc_server_application_cb handle_client;
 
 static int handle_client(void *data,
@@ -371,7 +677,7 @@  static int handle_client(void *data,
 			 ipc_server_reply_cb *reply,
 			 struct ipc_server_reply_data *reply_data)
 {
-	/* struct fsmonitor_daemon_state *state = data; */
+	struct fsmonitor_daemon_state *state = data;
 	int result;
 
 	/*
@@ -382,10 +688,12 @@  static int handle_client(void *data,
 	if (command_len != strlen(command))
 		BUG("FSMonitor assumes text messages");
 
+	trace_printf_key(&trace_fsmonitor, "requested token: %s", command);
+
 	trace2_region_enter("fsmonitor", "handle_client", the_repository);
 	trace2_data_string("fsmonitor", the_repository, "request", command);
 
-	result = 0; /* TODO Do something here. */
+	result = do_handle_client(state, command, reply, reply_data);
 
 	trace2_region_leave("fsmonitor", "handle_client", the_repository);