Next step on the road to data replication. At this point the interface with Chunk through CLD is basically done. Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx> diff --git a/server/cldu.c b/server/cldu.c index f1c2d4d..57332e3 100644 --- a/server/cldu.c +++ b/server/cldu.c @@ -10,16 +10,18 @@ #include <string.h> #include <unistd.h> #include <event.h> -#include <netdb.h> -#include <arpa/nameser.h> -#include <netinet/in.h> -#include <resolv.h> /* must follow in.h, nameser.h */ #include <errno.h> #include <cldc.h> +#include <elist.h> #include "tabled.h" #define ALIGN8(n) ((8 - ((n) & 7)) & 7) +struct chunk_node { + struct list_head link; + char name[65]; +}; + #define N_CLD 10 /* 5 * (v4+v6) */ struct cld_host { @@ -35,11 +37,18 @@ struct cld_session { int actx; /* Active host cldv[actx] */ struct cld_host cldv[N_CLD]; + char *thiscell; struct event ev; /* Associated with fd */ char *cfname; /* /tabled-cell directory */ struct cldc_fh *cfh; /* /tabled-cell directory, keep open for scan */ - char *ffname; /* /tabled-cell/ourhost */ - struct cldc_fh *ffh; /* /tabled-cell/ourhost, keep open for lock */ + char *ffname; /* /tabled-cell/thishost */ + struct cldc_fh *ffh; /* /tabled-cell/thishost, keep open for lock */ + char *xfname; /* /chunk-cell directory */ + struct cldc_fh *xfh; /* /chunk-cell directory */ + char *yfname; /* /chunk-cell/NID file */ + struct cldc_fh *yfh; /* /chunk-cell/NID file */ + + struct list_head chunks; /* found in xfname, struct chunk_node */ void (*state_cb)(enum st_cld); }; @@ -47,14 +56,19 @@ struct cld_session { static int cldu_set_cldc(struct cld_session *sp, int newactive); static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc); static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -#if 0 -static int cldu_close_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -#endif static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static void add_remote(char *name); +static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); +static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); +static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); +static void next_chunk(struct cld_session *sp); +static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); +static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); +static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); +static void add_remote(const char *name); +static void add_chunk_node(struct cld_session *sp, const char *name); /* * Identify the next host to be tried. @@ -78,45 +92,35 @@ static int cldu_nextactive(struct cld_session *sp) return sp->actx; } -static int cldu_setcell(struct cld_session *sp, const char *thiscell) +static int cldu_setcell(struct cld_session *sp, + const char *thiscell, const char *thishost) { - size_t cnlen; - size_t mlen; char *mem; if (thiscell == NULL) { thiscell = "default"; } - cnlen = strlen(thiscell); - -#if 0 /* old way without directories */ - cnlen = strlen(thiscell); - mlen = sizeof("/tabled-")-1; - mlen += cnlen; - mlen += sizeof("-master")-1; - mlen++; - mem = malloc(mlen); - sprintf(mem, "/tabled-%s-master", thiscell); - sp->mfname = mem; -#endif + sp->thiscell = strdup(thiscell); + if (!sp->thiscell) + goto err_oom; - mlen = sizeof("/tabled-")-1; - mlen += cnlen; - mlen++; // '\0' - mem = malloc(mlen); - sprintf(mem, "/tabled-%s", thiscell); + if (asprintf(&mem, "/tabled-%s", thiscell) == -1) + goto err_oom; sp->cfname = mem; - mlen = sizeof("/tabled-")-1; - mlen += cnlen; - mlen++; // '/' - mlen += strlen(tabled_srv.ourhost); - mlen++; // '\0' - mem = malloc(mlen); - sprintf(mem, "/tabled-%s/%s", thiscell, tabled_srv.ourhost); + if (asprintf(&mem, "/tabled-%s/%s", thiscell, thishost) == -1) + goto err_oom; sp->ffname = mem; + if (asprintf(&mem, "/chunk-%s", thiscell) == -1) + goto err_oom; + sp->xfname = mem; + + return 0; + +err_oom: + applog(LOG_WARNING, "OOM in cldu"); return 0; } @@ -334,19 +338,6 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) if (debugging) applog(LOG_DEBUG, "CLD directory \"%s\" created", sp->cfname); -#if 0 /* Don't close the directory, we'll rescan later instead */ - /* - * We don't use directory handle to open files in it, so close it. - */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_close_c_cb; - copts.private = sp; - rc = cldc_close(sp->cfh, &copts); - if (rc) { - applog(LOG_ERR, "cldc_close call error %d", rc); - } -#endif - /* * Then, create the membership file for us. */ @@ -362,36 +353,6 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) return 0; } -#if 0 -static int cldu_close_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD close(%s) failed: %d", sp->cfname, errc); - return 0; - } - -/* P3 */ applog(LOG_INFO, "CLD close success, opening %s", sp->ffname); - - /* - * Then, create the membership file for us. - */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_open_f_cb; - copts.private = sp; - rc = cldc_open(sp->lib->sess, &copts, sp->ffname, - COM_WRITE | COM_LOCK | COM_CREATE, - CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->ffh); - if (rc) { - applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->ffname, rc); - } - return 0; -} -#endif - static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) { struct cld_session *sp = carg->private; @@ -437,7 +398,7 @@ static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) int rc; if (errc != CLE_OK) { - applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->cfname, errc); + applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, errc); return 0; } @@ -490,8 +451,8 @@ static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) { struct cld_session *sp = carg->private; - // struct cldc_call_opts copts; - // int rc; + struct cldc_call_opts copts; + int rc; const char *ptr; int dir_len; int total_len, rec_len, name_len; @@ -518,7 +479,7 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) else buf[64] = 0; - if (!strcmp(buf, tabled_srv.ourhost)) { + if (!strcmp(buf, tabled_srv.ourhost)) { /* use thishost XXX */ if (debugging) applog(LOG_DEBUG, " %s (ourselves)", buf); } else { @@ -534,13 +495,249 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) if (sp->state_cb) (*sp->state_cb)(ST_CLD_ACTIVE); + /* + * Now we can collect the Chunk nodes in our cell. + */ + memset(&copts, 0, sizeof(copts)); + copts.cb = cldu_open_x_cb; + copts.private = sp; + rc = cldc_open(sp->lib->sess, &copts, sp->xfname, + COM_READ | COM_DIRECTORY, + CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->xfh); + if (rc) { + applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->xfname, rc); + } + return 0; +} + +static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +{ + struct cld_session *sp = carg->private; + struct cldc_call_opts copts; + int rc; + + if (errc != CLE_OK) { + applog(LOG_ERR, "CLD open(%s) failed: %d", sp->xfname, errc); + /* XXX recycle, maybe Chunks aren't up yet. */ + return 0; + } + if (sp->xfh == NULL) { + applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->xfname); + return 0; + } + if (!sp->xfh->valid) { + applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->xfname); + return 0; + } + + // if (debugging) + applog(LOG_DEBUG, "CLD directory \"%s\" opened", sp->xfname); + + /* + * Read the directory. + */ + memset(&copts, 0, sizeof(copts)); + copts.cb = cldu_get_x_cb; + copts.private = sp; + rc = cldc_get(sp->xfh, &copts, false); + if (rc) { + applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc); + } + return 0; +} + +static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +{ + struct cld_session *sp = carg->private; + struct cldc_call_opts copts; + int rc; + const char *ptr; + int dir_len; + int total_len, rec_len, name_len; + char buf[65]; + + if (errc != CLE_OK) { + applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, errc); + return 0; + } + + if (debugging) + applog(LOG_DEBUG, "Known Chunk nodes"); + + ptr = carg->u.get.buf; + dir_len = carg->u.get.size; + while (dir_len) { + name_len = GUINT16_FROM_LE(*(uint16_t *)ptr); + rec_len = name_len + 2; + total_len = rec_len + ALIGN8(rec_len); + + strncpy(buf, ptr+2, 64); + if (name_len < 64) + buf[name_len] = 0; + else + buf[64] = 0; + + if (debugging) + applog(LOG_DEBUG, " %s", buf); + add_chunk_node(sp, buf); + + ptr += total_len; + dir_len -= total_len; + } + + memset(&copts, 0, sizeof(copts)); + copts.cb = cldu_close_x_cb; + copts.private = sp; + rc = cldc_close(sp->xfh, &copts); + if (rc) { + applog(LOG_ERR, "cldc_close call error %d", rc); + } + return 0; +} + +static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +{ + struct cld_session *sp = carg->private; + // struct cldc_call_opts copts; + // int rc; + + if (errc != CLE_OK) { + applog(LOG_ERR, "CLD close(%s) failed: %d", sp->xfname, errc); + return 0; + } + + if (list_empty(&sp->chunks)) + applog(LOG_INFO, "No Chunk nodes found"); + else + next_chunk(sp); + return 0; +} + +static void next_chunk(struct cld_session *sp) +{ + struct chunk_node *np; + char *mem; + struct cldc_call_opts copts; + int rc; + + np = list_entry(sp->chunks.next, struct chunk_node, link); + + if (asprintf(&mem, "/chunk-%s/%s", sp->thiscell, np->name) == -1) { + applog(LOG_WARNING, "OOM in cldu"); + return; + } + sp->yfname = mem; + + if (debugging) + applog(LOG_DEBUG, "opening chunk parameters %s", sp->yfname); + + memset(&copts, 0, sizeof(copts)); + copts.cb = cldu_open_y_cb; + copts.private = sp; + rc = cldc_open(sp->lib->sess, &copts, sp->yfname, + COM_READ, + CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->yfh); + if (rc) { + applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->yfname, rc); + } +} + +static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +{ + struct cld_session *sp = carg->private; + struct cldc_call_opts copts; + int rc; + + if (errc != CLE_OK) { + applog(LOG_ERR, "CLD open(%s) failed: %d", sp->yfname, errc); + free(sp->yfname); + sp->yfname = NULL; + return 0; + } + if (sp->yfh == NULL) { + applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->yfname); + free(sp->yfname); + sp->yfname = NULL; + return 0; + } + if (!sp->yfh->valid) { + applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->yfname); + free(sp->yfname); + sp->yfname = NULL; + return 0; + } + + /* + * Read the Chunk's parameter file. + */ + memset(&copts, 0, sizeof(copts)); + copts.cb = cldu_get_y_cb; + copts.private = sp; + rc = cldc_get(sp->yfh, &copts, false); + if (rc) { + applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->yfname, rc); + } + return 0; +} + +static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +{ + struct cld_session *sp = carg->private; + struct cldc_call_opts copts; + int rc; + const char *ptr; + int len; + + if (errc != CLE_OK) { + applog(LOG_ERR, "CLD get(%s) failed: %d", sp->yfname, errc); + goto close_and_next; /* spaghetti */ + } + + ptr = carg->u.get.buf; + len = carg->u.get.size; + if (debugging) + applog(LOG_DEBUG, + "got %d bytes from %s\n", len, sp->yfname); + stor_add_node(ptr, len); + +close_and_next: + memset(&copts, 0, sizeof(copts)); + copts.cb = cldu_close_y_cb; + copts.private = sp; + rc = cldc_close(sp->yfh, &copts); + if (rc) { + applog(LOG_ERR, "cldc_close call error %d", rc); + } + return 0; +} + +static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +{ + struct cld_session *sp = carg->private; + struct chunk_node *np; + // struct cldc_call_opts copts; + // int rc; + + if (errc != CLE_OK) { + applog(LOG_ERR, "CLD close(%s) failed: %d", sp->yfname, errc); + return 0; + } + + free(sp->yfname); + sp->yfname = NULL; + + np = list_entry(sp->chunks.next, struct chunk_node, link); + list_del(&np->link); + + if (!list_empty(&sp->chunks)) + next_chunk(sp); return 0; } /* * FIXME need to read port number from the file (port:<space>num). */ -static void add_remote(char *name) +static void add_remote(const char *name) { struct db_remote *rp; @@ -559,6 +756,23 @@ static void add_remote(char *name) } /* + * Save the available chunk numbers into a local list temporarily, + * because we don't want to hog the event thread with reading. + */ +static void add_chunk_node(struct cld_session *sp, const char *name) +{ + struct chunk_node *np; + + np = malloc(sizeof(*np)); + if (!np) + return; + + strncpy(np->name, name, sizeof(np->name)-1); + np->name[sizeof(np->name)-1] = 0; + list_add_tail(&np->link, &sp->chunks); +} + +/* */ static struct cld_session ses; @@ -569,6 +783,9 @@ int cld_begin(const char *thishost, const char *thiscell, void (*cb)(enum st_cld)) { + cldc_init(); + INIT_LIST_HEAD(&ses.chunks); + /* * As long as we permit pre-seeding lists of CLD hosts, * we cannot wipe our session anymore. Note though, as long @@ -577,7 +794,7 @@ int cld_begin(const char *thishost, const char *thiscell, // memset(&ses, 0, sizeof(struct cld_session)); ses.state_cb = cb; - if (cldu_setcell(&ses, thiscell)) { + if (cldu_setcell(&ses, thiscell, thishost)) { /* Already logged error */ goto err_cell; } @@ -651,6 +868,9 @@ void cld_end(void) free(ses.cfname); free(ses.ffname); + free(ses.xfname); + free(ses.yfname); + free(ses.thiscell); } void cldu_add_host(const char *hostname, unsigned int port) diff --git a/server/storage.c b/server/storage.c index cf07b41..8a621b5 100644 --- a/server/storage.c +++ b/server/storage.c @@ -309,6 +309,14 @@ bool stor_obj_test(struct open_chunk *cep, uint64_t key) } /* + * Add a node using its parameters file (not nul-terminated). + */ +void stor_add_node(const char *data, size_t len) +{ + /* P3 */ applog(LOG_INFO, "Adding some node or sumthin."); +} + +/* * Wait for chunkd instances to come up, in case we start simultaneously * from a "make check" or a parallel boot script on the same computer, * of if a datacenter is being brought up. diff --git a/server/tabled.h b/server/tabled.h index d12d032..4c7cd95 100644 --- a/server/tabled.h +++ b/server/tabled.h @@ -312,6 +312,7 @@ extern bool stor_put_end(struct open_chunk *cep); extern ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t len); extern int stor_obj_del(struct storage_node *stn, uint64_t key); extern bool stor_obj_test(struct open_chunk *cep, uint64_t key); +extern void stor_add_node(const char *data, size_t len); extern void stor_init(void); #endif /* __TABLED_H__ */ diff --git a/test/start-daemon b/test/start-daemon index bb7ee82..85ef700 100755 --- a/test/start-daemon +++ b/test/start-daemon @@ -18,12 +18,10 @@ fi # May be different on Solaris... like /usr/libexec or such. cld -d data/cld -P cld.pid -p 18081 -E - -sleep 3 - chunkd -P chunkd.pid -f $top_srcdir/test/chunkd-test.conf -E -sleep 3 +# 3 is enough, but we like to let chunkd to come up eary and register with CLD +sleep 7 ../server/tabled -P tabled.pid -C $top_srcdir/test/tabled-test.conf -E -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html