When some paths are watched, they are added to the "watched" list in file watcher. When a path in this list is updated, the path is moved to "changed" list and no longer watched. With this patch we have a complete path exchanging picture between git and file-watcher: 1) Hand shake 2) Get the list of changed paths, clear CE_WATCHED on these paths. Set CE_VALID on the remaining CE_WATCHED paths 3) (Optionally) Ask to watch more paths. Set CE_WATCHED on them. CE_VALID is not set so these are still lstat'd 4) Refresh as usual. lstat is skipped on CE_VALID paths. If one of those paths at step 3 are found modified, CE_WATCHED is removed. 5) Write index to disk. Notify file-watcher about new index signature. Ask file watcher to remove the "changed paths". A few points: - Changed list remains until step 5. If git crashes or do not write index down, next time it starts, it'll fed the same changed list. - If git writes index down without telling file-watcher about it, next time it starts, hand shake should fail and git should clear all CE_WATCHED. - There's a buffer between starting watch at #3 and saving watch at #5. We do verify paths are clean at #4. But that time all watches should have been active for a while. No chance for race conditions. - #5 is sort of atomic. If git crashes half way through step 5, file watcher should not update its index signature. Which means next time git starts, hand shake fails (because new index's written) so we'll start over. Signed-off-by: Nguyễn Thái Ngọc Duy <pclouds@xxxxxxxxx> --- cache.h | 1 + file-watcher-lib.c | 99 ++++++++++++++++++++++++++++++++++++++ file-watcher-lib.h | 1 + file-watcher.c | 138 +++++++++++++++++++++++++++++++++++++++++++++++++++++ read-cache.c | 21 +++++++- 5 files changed, 258 insertions(+), 2 deletions(-) diff --git a/cache.h b/cache.h index 10ff33e..9f7d952 100644 --- a/cache.h +++ b/cache.h @@ -285,6 +285,7 @@ struct index_state { struct hash_table dir_hash; unsigned char sha1[20]; int watcher; + struct string_list *updated_entries; }; extern struct index_state the_index; diff --git a/file-watcher-lib.c b/file-watcher-lib.c index d4949a5..b6b0848 100644 --- a/file-watcher-lib.c +++ b/file-watcher-lib.c @@ -2,6 +2,7 @@ #include "file-watcher-lib.h" #include "pkt-line.h" #include "unix-socket.h" +#include "string-list.h" static char *watcher_path; static int WAIT_TIME = 50; /* in ms */ @@ -25,6 +26,11 @@ static int connect_watcher(const char *path) static void reset_watches(struct index_state *istate, int disconnect) { int i, changed = 0; + if (istate->updated_entries) { + string_list_clear(istate->updated_entries, 0); + free(istate->updated_entries); + istate->updated_entries = NULL; + } for (i = 0; i < istate->cache_nr; i++) if (istate->cache[i]->ce_flags & CE_WATCHED) { istate->cache[i]->ce_flags &= ~(CE_WATCHED | CE_VALID); @@ -41,6 +47,58 @@ static void reset_watches(struct index_state *istate, int disconnect) } } +static void mark_ce_valid(struct index_state *istate) +{ + struct strbuf sb = STRBUF_INIT; + char *line, *end; + int i, len; + unsigned long n; + if (packet_write_timeout(istate->watcher, WAIT_TIME, "get-changed") <= 0 || + !(line = packet_read_line_timeout(istate->watcher, WAIT_TIME, &len)) || + !starts_with(line, "changed ")) { + reset_watches(istate, 1); + return; + } + n = strtoul(line + 8, &end, 10); + if (end != line + len) { + reset_watches(istate, 1); + return; + } + if (!n) + goto done; + strbuf_grow(&sb, n); + if (read_in_full_timeout(istate->watcher, sb.buf, n, WAIT_TIME) != n) { + strbuf_release(&sb); + reset_watches(istate, 1); + return; + } + line = sb.buf; + end = line + n; + for (; line < end; line += len + 1) { + len = strlen(line); + i = index_name_pos(istate, line, len); + if (i < 0) + continue; + if (istate->cache[i]->ce_flags & CE_WATCHED) { + istate->cache[i]->ce_flags &= ~CE_WATCHED; + istate->cache_changed = 1; + } + if (!istate->updated_entries) { + struct string_list *sl; + sl = xmalloc(sizeof(*sl)); + memset(sl, 0, sizeof(*sl)); + sl->strdup_strings = 1; + istate->updated_entries = sl; + } + string_list_append(istate->updated_entries, line); + } + strbuf_release(&sb); +done: + for (i = 0; i < istate->cache_nr; i++) + if (istate->cache[i]->ce_flags & CE_WATCHED) + istate->cache[i]->ce_flags |= CE_VALID; +} + static int watcher_config(const char *var, const char *value, void *data) { if (!strcmp(var, "filewatcher.path")) { @@ -110,6 +168,8 @@ void open_watcher(struct index_state *istate) istate->update_watches = 1; return; } + + mark_ce_valid(istate); } static int sort_by_date(const void *a_, const void *b_) @@ -200,3 +260,42 @@ void watch_entries(struct index_state *istate) send_watches(istate, sorted, nr); free(sorted); } + +void close_watcher(struct index_state *istate, const unsigned char *sha1) +{ + struct strbuf sb = STRBUF_INIT; + int len, i, nr; + if (istate->watcher <= 0) + return; + if (packet_write_timeout(istate->watcher, WAIT_TIME, + "new-index %s", sha1_to_hex(sha1)) <= 0) + goto done; + nr = istate->updated_entries ? istate->updated_entries->nr : 0; + if (!nr) { + packet_write_timeout(istate->watcher, WAIT_TIME, "unchange 0"); + goto done; + } + for (i = len = 0; i < nr; i++) { + const char *s = istate->updated_entries->items[i].string; + len += strlen(s) + 1; + } + if (packet_write_timeout(istate->watcher, WAIT_TIME, + "unchange %d", len) <= 0) + goto done; + strbuf_grow(&sb, len); + for (i = 0; i < nr; i++) { + const char *s = istate->updated_entries->items[i].string; + int len = strlen(s); + strbuf_add(&sb, s, len + 1); + } + /* + * it does not matter if it fails anymore, we're closing + * down. If it only gets through partially, file watcher + * should ignore it. + */ + write_in_full_timeout(istate->watcher, sb.buf, sb.len, WAIT_TIME); + strbuf_release(&sb); +done: + close(istate->watcher); + istate->watcher = -1; +} diff --git a/file-watcher-lib.h b/file-watcher-lib.h index 1641024..df68a73 100644 --- a/file-watcher-lib.h +++ b/file-watcher-lib.h @@ -3,5 +3,6 @@ void open_watcher(struct index_state *istate); void watch_entries(struct index_state *istate); +void close_watcher(struct index_state *istate, const unsigned char *sha1); #endif diff --git a/file-watcher.c b/file-watcher.c index c257414..aa2daf6 100644 --- a/file-watcher.c +++ b/file-watcher.c @@ -3,6 +3,7 @@ #include "parse-options.h" #include "exec_cmd.h" #include "unix-socket.h" +#include "string-list.h" #include "pkt-line.h" static const char *const file_watcher_usage[] = { @@ -21,6 +22,9 @@ struct repository { * is probably enough for this case. */ ino_t inode; + struct string_list updated; + int updated_sorted; + int updating; }; const char *invalid_signature = "0000000000000000000000000000000000000000"; @@ -31,6 +35,8 @@ static int nr_repos; struct connection { int sock, polite; struct repository *repo; + + char new_index[41]; }; static struct connection **conns; @@ -42,6 +48,24 @@ static int watch_path(struct repository *repo, char *path) return -1; } +static void get_changed_list(int conn_id) +{ + struct strbuf sb = STRBUF_INIT; + int i, size, fd = conns[conn_id]->sock; + struct repository *repo = conns[conn_id]->repo; + + for (i = size = 0; i < repo->updated.nr; i++) + size += strlen(repo->updated.items[i].string) + 1; + packet_write(fd, "changed %d", size); + if (!size) + return; + strbuf_grow(&sb, size); + for (i = 0; i < repo->updated.nr; i++) + strbuf_add(&sb, repo->updated.items[i].string, + strlen(repo->updated.items[i].string) + 1); + write_in_full(fd, sb.buf, sb.len); +} + static inline uint64_t stamp(void) { struct timeval tv; @@ -101,6 +125,43 @@ static void watch_paths(int conn_id, char *buf, int maxlen) packet_write(conns[conn_id]->sock, "watched %u", n); } +static void unchange(int conn_id, unsigned long size) +{ + struct connection *conn = conns[conn_id]; + struct repository *repo = conn->repo; + if (size) { + struct strbuf sb = STRBUF_INIT; + char *p; + int len; + strbuf_grow(&sb, size); + if (read_in_full(conn->sock, sb.buf, size) <= 0) + return; + if (!repo->updated_sorted) { + sort_string_list(&repo->updated); + repo->updated_sorted = 1; + } + for (p = sb.buf; p - sb.buf < size; p += len + 1) { + struct string_list_item *item; + len = strlen(p); + item = string_list_lookup(&repo->updated, p); + if (!item) + continue; + unsorted_string_list_delete_item(&repo->updated, + item - repo->updated.items, 0); + } + strbuf_release(&sb); + } + memcpy(repo->index_signature, conn->new_index, 40); + /* + * If other connections on this repo are in some sort of + * session that depend on the previous repository state, we + * may need to disconnect them to be safe. + */ + + /* pfd[0] is the listening socket, can't be a connection */ + repo->updating = 0; +} + static struct repository *get_repo(const char *work_tree) { int first, last; @@ -129,12 +190,14 @@ static struct repository *get_repo(const char *work_tree) memset(repo, 0, sizeof(*repo)); repo->work_tree = xstrdup(work_tree); memset(repo->index_signature, '0', 40); + repo->updated.strdup_strings = 1; repos[first] = repo; return repo; } static void reset_repo(struct repository *repo, ino_t inode) { + string_list_clear(&repo->updated, 0); memcpy(repo->index_signature, invalid_signature, 40); repo->inode = inode; } @@ -147,6 +210,8 @@ static int shutdown_connection(int id) if (!conn) return 0; close(conn->sock); + if (conn->repo && conn->repo->updating == id) + conn->repo->updating = 0; free(conn); return 0; } @@ -268,6 +333,77 @@ static int handle_command(int conn_id) } watch_paths(conn_id, msg + 6, len - 6); } + + /* + * > "get-changed" + * < changed SP LENGTH + * < PATH-LIST + * + * When watched path gets updated, the path is moved from + * "watched" list to "changed" list and is no longer watched. + * This command get the list of changed paths. PATH-LIST is + * also sent if LENGTH is non-zero. + */ + else if (!strcmp(msg, "get-changed")) { + if (!conns[conn_id]->repo) { + packet_write(fd, "error have not received index command"); + return shutdown_connection(conn_id); + } + get_changed_list(conn_id); + } + + /* + * > "new-index" INDEX-SIGNATURE + * > "unchange" SP LENGTH + * > PATH-LIST + * + * "new-index" passes new index signature from the + * client. "unchange" sends the list of paths to be removed + * from "changed" list. + * + * "new-index" must be sent before "unchange". File watcher + * waits until the last "unchange" line, then update its index + * signature as well as "changed" list. + */ + else if (starts_with(msg, "new-index ")) { + if (len != 50) { + packet_write(fd, "error invalid new-index line %s", msg); + return shutdown_connection(conn_id); + } + if (!conns[conn_id]->repo) { + packet_write(fd, "error have not received index command"); + return shutdown_connection(conn_id); + } + if (conns[conn_id]->repo->updating == conn_id) { + packet_write(fd, "error received new-index command more than once"); + return shutdown_connection(conn_id); + } + memcpy(conns[conn_id]->new_index, msg + 10, 40); + /* + * if updating is non-zero the other client will get + * disconnected at the next "unchange" command because + * "updating" no longer points to its connection. + */ + conns[conn_id]->repo->updating = conn_id; + } + else if (skip_prefix(msg, "unchange ")) { + unsigned long n; + char *end; + n = strtoul(msg + 9, &end, 10); + if (end != msg + len) { + packet_write(fd, "error invalid unchange line %s", msg); + return shutdown_connection(conn_id); + } + if (!conns[conn_id]->repo) { + packet_write(fd, "error have not received index command"); + return shutdown_connection(conn_id); + } + if (conns[conn_id]->repo->updating != conn_id) { + packet_write(fd, "error have not received new-index command"); + return shutdown_connection(conn_id); + } + unchange(conn_id, n); + } else { packet_write(fd, "error unrecognized command %s", msg); return shutdown_connection(conn_id); @@ -436,6 +572,8 @@ int main(int argc, const char **argv) if (!conns[i]) continue; if (i != new_nr) { /* pfd[] is shrunk, move pfd[i] up */ + if (conns[i]->repo && conns[i]->repo->updating == i) + conns[i]->repo->updating = new_nr; conns[new_nr] = conns[i]; pfd[new_nr] = pfd[i]; } diff --git a/read-cache.c b/read-cache.c index dc49858..5540b06 100644 --- a/read-cache.c +++ b/read-cache.c @@ -1567,6 +1567,11 @@ int discard_index(struct index_state *istate) free(istate->cache); istate->cache = NULL; istate->cache_alloc = 0; + if (istate->updated_entries) { + string_list_clear(istate->updated_entries, 0); + free(istate->updated_entries); + istate->updated_entries = NULL; + } return 0; } @@ -1627,7 +1632,7 @@ static int write_index_ext_header(git_SHA_CTX *context, int fd, (ce_write(context, fd, &sz, 4) < 0)) ? -1 : 0; } -static int ce_flush(git_SHA_CTX *context, int fd) +static int ce_flush(git_SHA_CTX *context, int fd, unsigned char *sha1) { unsigned int left = write_buffer_len; @@ -1645,6 +1650,8 @@ static int ce_flush(git_SHA_CTX *context, int fd) /* Append the SHA1 signature at the end */ git_SHA1_Final(write_buffer + left, context); + if (sha1) + hashcpy(sha1, write_buffer + left); left += 20; return (write_in_full(fd, write_buffer, left) != left) ? -1 : 0; } @@ -1809,12 +1816,21 @@ int write_index(struct index_state *istate, int newfd) int entries = istate->cache_nr; struct stat st; struct strbuf previous_name_buf = STRBUF_INIT, *previous_name; + unsigned char sha1[20]; for (i = removed = extended = 0; i < entries; i++) { if (cache[i]->ce_flags & CE_REMOVE) removed++; else if (cache[i]->ce_flags & CE_WATCHED) { /* + * CE_VALID when used with CE_WATCHED is not + * supposed to be persistent. Next time git + * runs, if this entry is still watched and + * nothing has changed, CE_VALID will be + * reinstated. + */ + cache[i]->ce_flags &= ~CE_VALID; + /* * We may set CE_WATCHED (but not CE_VALID) * early when refresh has not been done * yet. At that time we had no idea if the @@ -1922,8 +1938,9 @@ int write_index(struct index_state *istate, int newfd) return -1; } - if (ce_flush(&c, newfd) || fstat(newfd, &st)) + if (ce_flush(&c, newfd, sha1) || fstat(newfd, &st)) return -1; + close_watcher(istate, sha1); istate->timestamp.sec = (unsigned int)st.st_mtime; istate->timestamp.nsec = ST_MTIME_NSEC(st); return 0; -- 1.8.5.2.240.g8478abd -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html