On 4/1/2021 11:40 AM, Jeff Hostetler via GitGitGadget wrote: > From: Jeff Hostetler <jeffhost@xxxxxxxxxxxxx> > > Teach fsmonitor--daemon to build lists of changed paths and associate > them with a token-id. This will be used by the platform-specific > backends to accumulate changed paths in response to filesystem events. > > The platform-specific event loops receive batches containing one or > more changed paths. Their fs listener thread will accumulate them in I think the lowercase "fs" here is strange. "Their listener thread" could be interpreted as the IPC listener, so it's probably best to spell it out: "Their filesystem listener thread". > a `fsmonitor_batch` (and without locking) and then "publish" them to > associate them with the current token and to make them visible to the > client worker threads. ... > +struct fsmonitor_batch { > + struct fsmonitor_batch *next; > + uint64_t batch_seq_nr; > + const char **interned_paths; > + size_t nr, alloc; > + time_t pinned_time; > +}; A linked list to help with adding while consuming it, but also batching for efficiency. I can see how this will work out nicely. > +struct fsmonitor_batch *fsmonitor_batch__new(void) > +{ > + struct fsmonitor_batch *batch = xcalloc(1, sizeof(*batch)); I mentioned earlier that I think `CALLOC_ARRAY(batch, 1)` is the typical pattern here. > + > + return batch; > +} > + > +struct fsmonitor_batch *fsmonitor_batch__free(struct fsmonitor_batch *batch) Since this method frees the tip of the list and returns the next item (instead of freeing the entire list), perhaps this would be better named as _pop()? > +{ > + struct fsmonitor_batch *next; > + > + if (!batch) > + return NULL; > + > + next = batch->next; > + > + /* > + * The actual strings within the array are interned, so we don't > + * own them. > + */ > + free(batch->interned_paths); > + > + return next; > +} > + > +void fsmonitor_batch__add_path(struct fsmonitor_batch *batch, > + const char *path) > +{ > + const char *interned_path = strintern(path); This use of interned paths is interesting, although I become concerned for the amount of memory we are consuming over the lifetime of the process. This could be considered as a target for future improvements, perhaps with an LRU cache or something similar. > + > + trace_printf_key(&trace_fsmonitor, "event: %s", interned_path); > + > + ALLOC_GROW(batch->interned_paths, batch->nr + 1, batch->alloc); > + batch->interned_paths[batch->nr++] = interned_path; > +} > + > +static void fsmonitor_batch__combine(struct fsmonitor_batch *batch_dest, > + const struct fsmonitor_batch *batch_src) > +{ > + /* assert state->main_lock */ > + This comment seems stale. > + size_t k; > + > + ALLOC_GROW(batch_dest->interned_paths, > + batch_dest->nr + batch_src->nr + 1, > + batch_dest->alloc); > + > + for (k = 0; k < batch_src->nr; k++) > + batch_dest->interned_paths[batch_dest->nr++] = > + batch_src->interned_paths[k]; > +} > + > +static void fsmonitor_free_token_data(struct fsmonitor_token_data *token) This one _does_ free the whole list. > +{ > + struct fsmonitor_batch *p; > + > + if (!token) > + return; > + > + assert(token->client_ref_count == 0); > + > + strbuf_release(&token->token_id); > + > + for (p = token->batch_head; p; p = fsmonitor_batch__free(p)) > + ; > + > + free(token); > +} > + > +/* > + * Flush all of our cached data about the filesystem. Call this if we > + * lose sync with the filesystem and miss some notification events. > + * > + * [1] If we are missing events, then we no longer have a complete > + * history of the directory (relative to our current start token). > + * We should create a new token and start fresh (as if we just > + * booted up). > + * > + * If there are no readers of the the current token data series, we > + * can free it now. Otherwise, let the last reader free it. Either > + * way, the old token data series is no longer associated with our > + * state data. > + */ > +void fsmonitor_force_resync(struct fsmonitor_daemon_state *state) > +{ > + struct fsmonitor_token_data *free_me = NULL; > + struct fsmonitor_token_data *new_one = NULL; > + > + new_one = fsmonitor_new_token_data(); > + > + pthread_mutex_lock(&state->main_lock); > + > + trace_printf_key(&trace_fsmonitor, > + "force resync [old '%s'][new '%s']", > + state->current_token_data->token_id.buf, > + new_one->token_id.buf); > + > + if (state->current_token_data->client_ref_count == 0) > + free_me = state->current_token_data; > + state->current_token_data = new_one; > + > + pthread_mutex_unlock(&state->main_lock); > + > + fsmonitor_free_token_data(free_me); > +} > + Swap the pointer under a lock, free outside of it. Good. > +/* > + * We try to combine small batches at the front of the batch-list to avoid > + * having a long list. This hopefully makes it a little easier when we want > + * to truncate and maintain the list. However, we don't want the paths array > + * to just keep growing and growing with realloc, so we insert an arbitrary > + * limit. > + */ > +#define MY_COMBINE_LIMIT (1024) > + > +void fsmonitor_publish(struct fsmonitor_daemon_state *state, > + struct fsmonitor_batch *batch, > + const struct string_list *cookie_names) > +{ > + if (!batch && !cookie_names->nr) > + return; > + > + pthread_mutex_lock(&state->main_lock); > + > + if (batch) { > + struct fsmonitor_batch *head; > + > + head = state->current_token_data->batch_head; > + if (!head) { > + batch->batch_seq_nr = 0; > + batch->next = NULL; > + state->current_token_data->batch_head = batch; > + state->current_token_data->batch_tail = batch; > + } else if (head->pinned_time) { > + /* > + * We cannot alter the current batch list > + * because: > + * > + * [a] it is being transmitted to at least one > + * client and the handle_client() thread has a > + * ref-count, but not a lock on the batch list > + * starting with this item. > + * > + * [b] it has been transmitted in the past to > + * at least one client such that future > + * requests are relative to this head batch. > + * > + * So, we can only prepend a new batch onto > + * the front of the list. > + */ > + batch->batch_seq_nr = head->batch_seq_nr + 1; > + batch->next = head; > + state->current_token_data->batch_head = batch; > + } else if (head->nr + batch->nr > MY_COMBINE_LIMIT) { > + /* > + * The head batch in the list has never been > + * transmitted to a client, but folding the > + * contents of the new batch onto it would > + * exceed our arbitrary limit, so just prepend > + * the new batch onto the list. > + */ > + batch->batch_seq_nr = head->batch_seq_nr + 1; > + batch->next = head; > + state->current_token_data->batch_head = batch; > + } else { > + /* > + * We are free to append the paths in the given > + * batch onto the end of the current head batch. > + */ > + fsmonitor_batch__combine(head, batch); > + fsmonitor_batch__free(batch); > + } > + } > + > + pthread_mutex_unlock(&state->main_lock); > +} I appreciate the careful comments in this critical piece of the data structure. Also, it is good that you already have a batch of results to merge into the list instead of updating a lock for every filesystem event. Thanks, -Stolee