On Tue, 10 Aug 2010 15:14:19 -0400 Jeff Garzik <jeff@xxxxxxxxxx> wrote: > On 08/05/2010 11:40 PM, Pete Zaitcev wrote: > > The metadata replication in tabled nominally existed, but did not > > worked. There were a couple of small bugs (such as an attempt to > > boot directly into Slave state would lead to a hang). However, the > > biggest problem was how the identity of nodes in Replication Manager > > API had to be the same as hostname. When doing so, repmgr code > > used the hostname to bind instead of a wildcard socket. But when > > doing so, on any stock Fedora or RHEL system it would end listening > > on loopback only, because the /etc/hosts aliased the hostname to > > loopback address. Thus running any replication required addition > > host configuration that can cause any kind of unexpected consequences. > > In addition it's impossible to run two nodes on one host for testing. > > > > This patch does away with the Replication Manager and uses Base API > > instead. This way, issues with host aliasing are addressed, and > > the state transitions occur much faster because there is no voting. > > > > Note that the provision is added to run peers on the same host, > > using a configuration clause TDBRepName. I was unable to come up with > > a reliable way to make persistent, nonconflicting identifiers that > > would replace hostnames. Fortunately, this should only be used > > for build tests, where we probably can live with it. > > > > The resulting replication feature was tested to work. Not sure if > > it is enough to trust it with one's data, but it's better than before. > > > > Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx> > > > > --- > > doc/etc.tabled.conf | 8 > > doc/setup.txt | 13 + > > include/tdb.h | 15 - > > lib/tdb.c | 130 ++++++------- > > server/Makefile.am | 2 > > server/bucket.c | 34 +-- > > server/cldu.c | 416 ++++++++++++++++++++++++++++++++++++------ > > server/config.c | 10 + > > server/object.c | 22 +- > > server/replica.c | 8 > > server/server.c | 404 ++++++++++++++++++++++++++++++++++++---- > > server/tabled.h | 97 +++++++++ > > server/tdbadm.c | 51 +---- > > 13 files changed, 963 insertions(+), 247 deletions(-) > > Including metarep.c would be helpful ;-) > > Will wait on release for this... Sorry! Unfortunately this is getting a little behind, because I started working on the tests and they require some changes (e.g. the listening host and port may be unknown at the time MASTER file is locked). --- doc/etc.tabled.conf | 8 doc/setup.txt | 13 include/tdb.h | 15 lib/tdb.c | 130 ++-- server/Makefile.am | 2 server/bucket.c | 34 - server/cldu.c | 416 ++++++++++++-- server/config.c | 10 server/metarep.c | 1245 ++++++++++++++++++++++++++++++++++++++++++ server/object.c | 22 server/replica.c | 8 server/server.c | 404 ++++++++++++- server/tabled.h | 97 +++ server/tdbadm.c | 51 - 14 files changed, 2208 insertions(+), 247 deletions(-) diff --git a/doc/etc.tabled.conf b/doc/etc.tabled.conf index 22d20a7..c3b1d1d 100644 --- a/doc/etc.tabled.conf +++ b/doc/etc.tabled.conf @@ -13,12 +13,12 @@ <!-- One group per DB, don't skimp on groups. Also, make sure the replication - ports do not conflict when you make hosts to host several groups. - Unfortunately, the diagnostics are not very good if they do. - Most likely you'll see database corruption in such cases. + ports do not conflict when you make boxes to host several groups or use + replication instances iwth TDBRepName. --> <Group>ultracart2</Group> -<TDB>/path/tabled/tdb</TDB> +<TDB>/path/tabled-uc2/</TDB> <!-- mkdir -p /path/tabled-uc2 --> +<!-- <TDBRepName>12345.my_local_node_name.example.com</TDBRepName> --> <TDBRepPort>8083</TDBRepPort> <!-- diff --git a/doc/setup.txt b/doc/setup.txt index ac0dfb0..c7a4c6a 100644 --- a/doc/setup.txt +++ b/doc/setup.txt @@ -15,7 +15,9 @@ _cld._udp.phx2.ex.com has SRV record 10 50 8081 maika.phx2.ex.com. Also, make sure that your hostname has a domain. We don't want to search for CLD in the world-wide DNS root, do we? - Make sure CLD is up (run "cldcli" to verify). + Once you know that CLD is running, verify that tabled can talk to + it by running "cldcli". UDP traffic to be allowed for port 8081 or + other port as specified in the SRV record. *) Another thing to set up in DNS is a wildcard host for the system where tabled will run. Unlike the SRV records of CLD, this is optional, but @@ -30,6 +32,10 @@ emus3 IN A 192.168.128.9 All examples on Google say FQDN is required, and most presume aliasing of A and AAAA records, but BIND 9 eats the above fine. +*) Speaking of FQDN, it is possible to force tabled to use a non-default + hostname with ForceHost tag. In practice this is only useful when + the DNS is broken. + *) Copy configuration file from doc/etc.tabled.conf to /etc/tabled.conf and edit to suit (see configurable items below). Notice that the file looks like XML, but is not really. In particular, names of elements are @@ -53,6 +59,11 @@ emus3 IN A 192.168.128.9 Group name defaults to "default", so you can leave this element unset, but don't do it. Any name, even "qwerty", is better than the default. +*) In each group, tabled uses its hostname to identify itself. However, + if you ever wish to run two tabled processes that serve the same group, + it can be accomplished by setting TDBRepName. N.B.: A loss of power for + the host will knock out all of them, so never use this in production. + *) Select the port to listen, if desired. This is done using the <Listen> element: diff --git a/include/tdb.h b/include/tdb.h index 8895704..ff3b4b5 100644 --- a/include/tdb.h +++ b/include/tdb.h @@ -109,15 +109,12 @@ struct tabledb { DB *oids; /* object ID db */ }; -struct db_remote { /* remotes for tdb_init */ - char *host; - unsigned short port; -}; - -extern int tdb_init(struct tabledb *tdb, const char *home, const char *pass, - unsigned int env_flags, const char *errpfx, bool do_syslog, - GList *remotes, char *rep_host, unsigned short rep_port, - void (*cb)(enum db_event)); +extern int tdb_init(struct tabledb *tdb, const char *db_home, + const char *db_password, const char *errpfx, bool do_syslog, + int rep_our_id, + int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec, + const DB_LSN *lsnp, int envid, uint32_t flags), + bool we_are_master, void (*cb)(enum db_event)); extern int tdb_up(struct tabledb *tdb, unsigned int open_flags); extern void tdb_down(struct tabledb *tdb); extern void tdb_fini(struct tabledb *tdb); diff --git a/lib/tdb.c b/lib/tdb.c index bc5e50a..29a18f0 100644 --- a/lib/tdb.c +++ b/lib/tdb.c @@ -1,6 +1,6 @@ /* - * Copyright 2008-2009 Red Hat, Inc. + * Copyright 2008-2010 Red Hat, Inc. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -148,35 +148,15 @@ err_out: return -EIO; } -static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites) -{ - int rc; - struct db_remote *rp; - GList *tmp; - - *nsites = 0; - for (tmp = remotes; tmp; tmp = tmp->next) { - rp = tmp->data; - - rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port, - NULL, 0); - if (rc) { - dbenv->err(dbenv, rc, - "dbenv->add.remote.site host %s port %u", - rp->host, rp->port); - return rc; - } - (*nsites)++; - } - - return 0; -} - static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info) { struct tabledb *tdb = dbenv->app_private; switch (event) { + case DB_EVENT_PANIC: + dbenv->errx(dbenv, "PANIC event is reported, exiting"); + exit(2); + break; case DB_EVENT_REP_CLIENT: tdb->is_master = false; if (tdb->state_cb) @@ -191,6 +171,14 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info) if (tdb->state_cb) (*tdb->state_cb)(TDB_EV_ELECTED); break; + case DB_EVENT_REP_NEWMASTER: + dbenv->errx(dbenv, "New master is reported: %d", + *(int *)event_info); + /* XXX Need to verify that it's the same master as before. */ + break; + case DB_EVENT_REP_STARTUPDONE: + dbenv->errx(dbenv, "Client start-up complete"); + break; default: /* do nothing */ break; @@ -202,15 +190,18 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info) * db_password, cb can be NULL */ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password, - unsigned int env_flags, const char *errpfx, bool do_syslog, - GList *remotes, char *rep_host, unsigned short rep_port, + const char *errpfx, bool do_syslog, int rep_ourid, + int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec, + const DB_LSN *lsnp, int envid, uint32_t flags), + bool we_are_master, void (*cb)(enum db_event)) { - int nsites; + unsigned int env_flags; + unsigned int rep_flags; int rc; DB_ENV *dbenv; - tdb->is_master = false; + tdb->is_master = we_are_master; tdb->home = db_home; tdb->state_cb = cb; @@ -258,12 +249,6 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password, tdb->keyed = true; } - rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0); - if (rc) { - dbenv->err(dbenv, rc, "repmgr_set_local_site"); - goto err_out; - } - rc = dbenv->set_event_notify(dbenv, db4_event); if (rc) { dbenv->err(dbenv, rc, "set_event_notify"); @@ -283,42 +268,65 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password, // goto err_out; // } - rc = dbenv->rep_set_priority(dbenv, 100); - if (rc) { - dbenv->err(dbenv, rc, "rep_set_priority"); - goto err_out; - } + if (rep_send) { + rc = dbenv->rep_set_transport(dbenv, rep_ourid, rep_send); + if (rc) { + dbenv->err(dbenv, rc, "rep_set_transport"); + goto err_out; + } - /* init DB transactional environment, stored in directory db_home */ - env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL; - env_flags |= DB_INIT_TXN | DB_INIT_REP; - rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR); - if (rc) { - dbenv->err(dbenv, rc, "open(dbenv)"); - goto err_out; - } + // /* + // * Fix the derbies. This is the only way, since passing of + // * DB_REP_MASTER to rep_start() after a failover will end in: + // * "DB_REP_UNAVAIL: Unable to elect a master" (and a hang). + // */ + // rc = dbenv->rep_set_priority(dbenv, we_are_master ? 100 : 10); + // if (rc) { + // dbenv->err(dbenv, rc, "rep_set_priority"); + // goto err_out; + // } + + env_flags = DB_RECOVER | DB_CREATE | DB_THREAD; + env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL; + env_flags |= DB_INIT_TXN | DB_INIT_REP; + rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR); + if (rc) { + dbenv->err(dbenv, rc, "open rep"); + goto err_out; + } - rc = add_remote_sites(dbenv, remotes, &nsites); - if (rc) - goto err_out; + rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT; + rc = dbenv->rep_start(dbenv, NULL, rep_flags); + if (rc) { + dbenv->err(dbenv, rc, "rep_start"); + goto err_out; + } - // rc = dbenv->rep_set_nsites(dbenv, nsites + 1); - // if (rc) { - // dbenv->err(dbenv, rc, "rep_set_nsites"); - // goto err_out; - // } + } else { + env_flags = DB_RECOVER | DB_CREATE | DB_THREAD; + env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL; + env_flags |= DB_INIT_TXN; + rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR); + if (rc) { + dbenv->err(dbenv, rc, "open norep"); + goto err_out; + } - rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION); - if (rc) { - dbenv->err(dbenv, rc, "repmgr_start"); - goto err_out; + /* XXX rip this out from tdbadm.c */ + /* + * The db4 only delivers callbacks if replication was ordered. + * Since we force-set master, we ought to deliver them here + * for the universal code to work as if a master was elected. + */ + if (cb) + (*cb)(we_are_master ? TDB_EV_MASTER : TDB_EV_CLIENT); } return 0; err_out: dbenv->close(dbenv, 0); - return rc; + return -1; } /* diff --git a/server/Makefile.am b/server/Makefile.am index 6397245..5b53a0a 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -4,7 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@ sbin_PROGRAMS = tabled tdbadm tabled_SOURCES = tabled.h \ - bucket.c cldu.c config.c object.c replica.c \ + bucket.c cldu.c config.c metarep.c object.c replica.c \ server.c status.c storage.c storparse.c util.c tabled_LDADD = ../lib/libtdb.a \ @HAIL_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \ diff --git a/server/bucket.c b/server/bucket.c index a95d23e..eb03e03 100644 --- a/server/bucket.c +++ b/server/bucket.c @@ -43,11 +43,11 @@ bool has_access(const char *user, const char *bucket, const char *key, size_t alloc_len, key_len = 0; struct db_acl_key *acl_key; struct db_acl_ent *acl; - DB_ENV *dbenv = tdb.env; + DB_ENV *dbenv = tdbrep.tdb.env; DB_TXN *txn = NULL; DBT pkey, pval; DBC *cur = NULL; - DB *acls = tdb.acls; + DB *acls = tdbrep.tdb.acls; if (user == NULL) user = DB_ACL_ANON; @@ -132,7 +132,7 @@ err_out: static int add_access_user(DB_TXN *txn, const char *bucket, const char *key, const char *user, const char *perms) { - DB *acls = tdb.acls; + DB *acls = tdbrep.tdb.acls; int key_len; int acl_len; struct db_acl_ent *acl; @@ -203,8 +203,8 @@ bool service_list(struct client *cli, const char *user) bool rcb; DB_TXN *txn = NULL; DBC *cur = NULL; - DB_ENV *dbenv = tdb.env; - DB *bidx = tdb.buckets_idx; + DB_ENV *dbenv = tdbrep.tdb.env; + DB *bidx = tdbrep.tdb.buckets_idx; DBT skey, pkey, pval; if (asprintf(&s, @@ -348,7 +348,7 @@ bool bucket_valid(const char *bucket) static int bucket_find(DB_TXN *txn, const char *bucket, char *owner, int owner_len) { - DB *buckets = tdb.buckets; + DB *buckets = tdbrep.tdb.buckets; DBT key, val; struct db_bucket_ent ent; int rc; @@ -455,9 +455,9 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket) struct db_bucket_ent ent; bool setacl; /* is ok to put pre-existing bucket */ enum ReqACLC canacl; - DB *buckets = tdb.buckets; - DB *acls = tdb.acls; - DB_ENV *dbenv = tdb.env; + DB *buckets = tdbrep.tdb.buckets; + DB *acls = tdbrep.tdb.acls; + DB_ENV *dbenv = tdbrep.tdb.env; DB_TXN *txn = NULL; DBT key, val; @@ -589,11 +589,11 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket) enum errcode err = InternalError; int rc; struct db_bucket_ent ent; - DB_ENV *dbenv = tdb.env; + DB_ENV *dbenv = tdbrep.tdb.env; DB_TXN *txn = NULL; - DB *buckets = tdb.buckets; - DB *acls = tdb.acls; - DB *objs = tdb.objs; + DB *buckets = tdbrep.tdb.buckets; + DB *acls = tdbrep.tdb.acls; + DB *objs = tdbrep.tdb.objs; DBC *cur = NULL; DBT key, val; char structbuf[sizeof(struct db_acl_key) + 32]; @@ -922,9 +922,9 @@ static bool bucket_list_keys(struct client *cli, const char *user, size_t pfx_len; struct bucket_list_info bli; bool rcb; - DB_ENV *dbenv = tdb.env; + DB_ENV *dbenv = tdbrep.tdb.env; DB_TXN *txn = NULL; - DB *objs = tdb.objs; + DB *objs = tdbrep.tdb.objs; DBC *cur = NULL; DBT pkey, pval; struct db_obj_key *obj_key; @@ -1159,8 +1159,8 @@ bool access_list(struct client *cli, const char *bucket, const char *key, GHashTable *param; enum errcode err = InternalError; - DB_ENV *dbenv = tdb.env; - DB *acls = tdb.acls; + DB_ENV *dbenv = tdbrep.tdb.env; + DB *acls = tdbrep.tdb.acls; int alloc_len; char owner[64]; GList *res; diff --git a/server/cldu.c b/server/cldu.c index 5f3631b..45a6a83 100644 --- a/server/cldu.c +++ b/server/cldu.c @@ -35,6 +35,8 @@ #define ALIGN8(n) ((8 - ((n) & 7)) & 7) +#define MASTER_FILE "MASTER" + struct chunk_node { struct list_head link; char name[65]; @@ -63,18 +65,22 @@ struct cld_session { int actx; /* Active host cldv[actx] */ struct cld_host cldv[N_CLD]; + char *thisname; char *thisgroup; char *thishost; char *cfname; /* /tabled-group directory */ struct ncld_fh *cfh; /* /tabled-group directory, keep open for scan */ - char *ffname; /* /tabled-group/thishost */ - struct ncld_fh *ffh; /* /tabled-group/thishost, keep open for lock */ + char *ffname; /* /tabled-group/thisname */ + struct ncld_fh *ffh; /* /tabled-group/thisname, keep open for lock */ + char *mfname; /* /tabled-group/MASTER */ + struct ncld_fh *mfh; /* /tabled-group/MASTER, keep open for lock */ char *xfname; /* /chunk-GROUP directory */ struct list_head chunks; /* found in xfname, struct chunk_node */ }; static int cldu_set_cldc(struct cld_session *sp, int newactive); +static int scan_peers(struct cld_session *sp); static int scan_chunks(struct cld_session *sp); static void next_chunk(struct cld_session *sp, struct chunk_node *np); static void add_remote(const char *name); @@ -113,13 +119,17 @@ static int cldu_nextactive(struct cld_session *sp) * chunkservers that it uses, so this function only takes one group argument. */ static int cldu_setgroup(struct cld_session *sp, - const char *thisgroup, const char *thishost) + const char *thisgroup, const char *thishost, + const char *thisname) { char *mem; if (thisgroup == NULL) { thisgroup = "default"; } + if (thisname == NULL) { + thisname = thishost; + } sp->thisgroup = strdup(thisgroup); if (!sp->thisgroup) @@ -127,15 +137,22 @@ static int cldu_setgroup(struct cld_session *sp, sp->thishost = strdup(thishost); if (!sp->thishost) goto err_oom; + sp->thisname = strdup(thisname); + if (!sp->thisname) + goto err_oom; if (asprintf(&mem, "/tabled-%s", thisgroup) == -1) goto err_oom; sp->cfname = mem; - if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thishost) == -1) + if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thisname) == -1) goto err_oom; sp->ffname = mem; + if (asprintf(&mem, "/tabled-%s/%s", thisgroup, MASTER_FILE) == -1) + goto err_oom; + sp->mfname = mem; + if (asprintf(&mem, "/chunk-%s", thisgroup) == -1) goto err_oom; sp->xfname = mem; @@ -147,6 +164,259 @@ err_oom: return 0; } +/* + * Ugh, side effects on tabled_srv.rep_master. + */ +static void cldu_parse_master(const char *mfname, const char *mfile, long len) +{ + enum lex_state { lex_tag, lex_colon, lex_val }; + const char *tag, *val; + int taglen; + const char *name, *host, *port; + int namelen, hostlen, portlen; + char namebuf[65], hostbuf[65], portbuf[15]; + long portnum; + enum lex_state state; + struct db_remote *rp; + const char *p; + char c; + + name = NULL; + namelen = 0; + host = NULL; + hostlen = 0; + port = NULL; + portlen = 0; + + p = mfile; + tag = p; + val = NULL; + state = lex_tag; + for (;;) { + if (p >= mfile+len) + break; + c = *p++; + if (state == lex_tag) { + if (c == ':') { + val = p; + state = lex_colon; + taglen = (p-1) - tag; + } else if (c == '\n') { + if (debugging) + applog(LOG_DEBUG, + "%s: No colon", mfname); + tag = p; + val = NULL; + state = lex_tag; + } + } else if (state == lex_colon) { + if (c == ' ') { + val = p; + } else if (c == '\n') { + if (debugging) + applog(LOG_DEBUG, + "%s: Empty value", mfname); + tag = p; + val = NULL; + state = lex_tag; + } else { + state = lex_val; + } + } else if (state == lex_val) { + if (c == '\n') { + if (taglen == sizeof("name")-1 && + memcmp(tag, "name", taglen) == 0) { + name = val; + namelen = (p-1) - val; + } else if (taglen == sizeof("host")-1 && + memcmp(tag, "host", taglen) == 0) { + host = val; + hostlen = (p-1) - val; + } else if (taglen == sizeof("port")-1 && + memcmp(tag, "port", taglen) == 0) { + port = val; + portlen = (p-1) - val; + } else { + if (debugging) + applog(LOG_DEBUG, + "%s: Unknown tag %c[%d]", + mfname, tag[0], taglen); + } + tag = p; + val = NULL; + state = lex_tag; + } + } else { + return; + } + } + + if (!name || !namelen) { + if (debugging) + applog(LOG_DEBUG, "%s: No name", mfname); + return; + } + if (!host || !hostlen) { + if (debugging) + applog(LOG_DEBUG, "%s: No host", mfname); + return; + } + if (!port || !portlen) { + if (debugging) + applog(LOG_DEBUG, "%s: No port", mfname); + return; + } + + if (namelen >= sizeof(namebuf)) { + applog(LOG_ERR, "Long master name"); + return; + } + memcpy(namebuf, name, namelen); + namebuf[namelen] = 0; + + if (hostlen >= sizeof(hostbuf)) { + applog(LOG_ERR, "Long host"); + return; + } + memcpy(hostbuf, host, hostlen); + hostbuf[hostlen] = 0; + + if (portlen >= sizeof(portbuf)) { + applog(LOG_ERR, "Long port"); + return; + } + memcpy(portbuf, port, portlen); + portbuf[portlen] = 0; + portnum = strtol(port, NULL, 10); + if (portnum <= 0 || portnum >= 65536) { + applog(LOG_ERR, "Bad port %s", portbuf); + return; + } + + rp = tdb_find_remote_byname(namebuf); + if (!rp) { + if (debugging) + applog(LOG_DEBUG, "%s: Not found master %s", + mfname, namebuf); + return; + } + + if (debugging) + applog(LOG_DEBUG, "Found master %s host %s port %u", + namebuf, hostbuf, portnum); + + rp->host = strdup(hostbuf); + rp->port = portnum; + if (!rp->host) + return; + tabled_srv.rep_master = rp; +} + +static void cldu_get_master(const char *mfname, struct ncld_fh *mfh) +{ + struct ncld_read *nrp; + struct timespec tm; + int error; + + nrp = ncld_get(mfh, &error); + if (!nrp) { + applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error); + return; + } + + if (nrp->length < 3) { + ncld_read_free(nrp); + + /* + * Since master opens, locks, and writes, in that order, + * there's a gap between the lock and write. So, unrace a bit. + */ + tm.tv_sec = 2; + tm.tv_nsec = 0; + nanosleep(&tm, NULL); + + nrp = ncld_get(mfh, &error); + if (!nrp) { + applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error); + return; + } + + if (nrp->length < 3) { + applog(LOG_ERR, "CLD master(%s) is empty", mfname); + ncld_read_free(nrp); + return; + } + } + + cldu_parse_master(mfname, nrp->ptr, nrp->length); + ncld_read_free(nrp); +} + +/* + * Lock the MASTER file, write or read it as needed. + * N.B. Only call this if you know that mfh is closed or never open: + * right after cldu_set_cldc (disposing of session closes handles), + * or when we were slave and so should not kept mfh ... + * FIXME this will become more interesting when we keep mfh open in slave + * state so we can have outstanding locks for master failover notification. + */ +static int cldu_set_master(struct cld_session *sp) +{ + char *buf; + int len; + int error; + int rc; + + if (!sp->nsp) + return -1; + + /* Maybe drop this later, after notifications work. */ + if (debugging) { + rc = g_list_length(sp->nsp->handles); + applog(LOG_DEBUG, "open handles %d", rc); + } + + sp->mfh = ncld_open(sp->nsp, sp->mfname, + COM_READ | COM_WRITE | COM_LOCK | COM_CREATE, + &error, 0, NULL, NULL); + if (!sp->mfh) { + applog(LOG_ERR, "CLD open(%s) failed: %d", sp->mfname, error); + goto err_open; + } + + error = ncld_trylock(sp->mfh); + if (error) { + applog(LOG_INFO, "CLD lock(%s) failed: %d", sp->mfname, error); + cldu_get_master(sp->mfname, sp->mfh); + goto err_lock; + } + + len = asprintf(&buf, "name: %s\nhost: %s\nport: %u\n", + sp->thisname, sp->thishost, tabled_srv.rep_port); + if (len < 0) { + applog(LOG_ERR, "internal error: no core"); + goto err_wmem; + } + + rc = ncld_write(sp->mfh, buf, len); + if (rc) { + applog(LOG_ERR, "CLD put(%s) failed: %d", sp->mfname, rc); + goto err_write; + } + + free(buf); + return 0; + +err_write: + free(buf); +err_wmem: + /* ncld_unlock() - close will unlock */ +err_lock: + ncld_close(sp->mfh); +err_open: + return -1; +} + static void cldu_tm_rescan(int fd, short events, void *userdata) { struct cld_session *sp = userdata; @@ -162,14 +432,37 @@ static void cldu_tm_rescan(int fd, short events, void *userdata) sp->nsp = NULL; } newactive = cldu_nextactive(sp); - if (cldu_set_cldc(sp, newactive)) { - evtimer_add(&sp->tm_rescan, &cldu_rescan_delay); - return; + if (cldu_set_cldc(sp, newactive)) + goto out; + + if (cldu_set_master(sp) == 0) { + tabled_srv.state_want = ST_W_MASTER; + } else { + if (debugging) + applog(LOG_DEBUG, "Unable to relock %s", + sp->mfname); + tabled_srv.state_want = ST_W_SLAVE; } + cld_update_cb(); + sp->is_dead = false; + } else { + if (tabled_srv.state_want == ST_W_SLAVE) { + if (cldu_set_master(sp) == 0) { + tabled_srv.state_want = ST_W_MASTER; + } else { + if (debugging) + applog(LOG_DEBUG, "Unable to lock %s", + sp->mfname); + } + } } + if (scan_peers(sp) != 0) + goto out; scan_chunks(sp); + + out: evtimer_add(&sp->tm_rescan, &cldu_rescan_delay); } @@ -201,12 +494,6 @@ static void cldu_sess_event(void *priv, uint32_t what) static int cldu_set_cldc(struct cld_session *sp, int newactive) { struct cldc_host *hp; - struct ncld_read *nrp; - char buf[100]; - const char *ptr; - int dir_len; - int total_len, rec_len, name_len; - int len; struct timespec tm; int error; int rc; @@ -261,6 +548,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive) /* * Then, create the membership file for us. + * We lock it in case of two tabled running with same name by mistake. */ sp->ffh = ncld_open(sp->nsp, sp->ffname, COM_WRITE | COM_LOCK | COM_CREATE, @@ -285,11 +573,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive) /* * The usual reason why we get a lock conflict is * restarting too quickly and hitting the previous lock - * that is going to disappear soon. - * - * FIXME: However, it may also be that a master - * is ok we we should become a slave, e.g. start TDB. - * We do not support multi-node, but we should. + * that is going to disappear soon. Just wait it out. */ tm.tv_sec = 10; tm.tv_nsec = 0; @@ -299,21 +583,43 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive) /* * Write the file with our connection parameters. */ - len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port); - if (len >= sizeof(buf)) { - applog(LOG_ERR, "internal error: overflow for port (%d)", len); - goto err_wmem; - } - - rc = ncld_write(sp->ffh, buf, len); + rc = ncld_write(sp->ffh, "-\n", 2); if (rc) { applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc); goto err_write; } /* - * Read the directory. + * Finally, scan cfh to find peers, add with global effects. */ + if (scan_peers(sp) != 0) + goto err_pscan; + + return 0; + +err_pscan: +err_write: +err_lock: + ncld_close(sp->ffh); /* session-close closes these, maybe drop */ +err_fopen: + ncld_close(sp->cfh); +err_copen: + ncld_sess_close(sp->nsp); + sp->nsp = NULL; +err_nsess: +err_addr: + return -1; +} + +static int scan_peers(struct cld_session *sp) +{ + struct ncld_read *nrp; + char buf[65]; + const char *ptr; + int dir_len; + int total_len, rec_len, name_len; + int error; + nrp = ncld_get(sp->cfh, &error); if (!nrp) { applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error); @@ -336,13 +642,20 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive) else buf[64] = 0; - if (!strcmp(buf, sp->thishost)) { + if (!strcmp(buf, MASTER_FILE)) { + ; /* ignore special entry */ + } else if (!strcmp(buf, sp->thisname)) { if (debugging) applog(LOG_DEBUG, " %s (ourselves)", buf); } else { - if (debugging) - applog(LOG_DEBUG, " %s", buf); - add_remote(buf); + if (tdb_find_remote_byname(buf)) { + if (debugging) + applog(LOG_DEBUG, " %s", buf); + } else { + if (debugging) + applog(LOG_DEBUG, " %s (new)", buf); + add_remote(buf); + } } ptr += total_len; @@ -350,21 +663,9 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive) } ncld_read_free(nrp); - return 0; err_dread: -err_write: -err_wmem: -err_lock: - ncld_close(sp->ffh); /* session-close closes these, maybe drop */ -err_fopen: - ncld_close(sp->cfh); -err_copen: - ncld_sess_close(sp->nsp); - sp->nsp = NULL; -err_nsess: -err_addr: return -1; } @@ -508,9 +809,6 @@ err_mem: return; } -/* - * FIXME need to read port number from the file (port:<space>num). - */ static void add_remote(const char *name) { struct db_remote *rp; @@ -518,10 +816,15 @@ static void add_remote(const char *name) rp = malloc(sizeof(struct db_remote)); if (!rp) return; + memset(rp, 0, sizeof(struct db_remote)); + + /* + * Master assigns global IDs now, distributes them in login protocol. + */ + rp->dbid = DBID_NONE; - rp->port = 8083; - rp->host = strdup(name); - if (!rp->host) { + rp->name = strdup(name); + if (!rp->name) { free(rp); return; } @@ -564,7 +867,8 @@ void cld_init() /* * This initiates our sole session with a CLD instance. */ -int cld_begin(const char *thishost, const char *thisgroup, int verbose) +int cld_begin(const char *thishost, const char *thisgroup, + const char *thisname, int verbose) { static struct cld_session *sp = &ses; struct timespec tm; @@ -575,7 +879,7 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose) evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses); - if (cldu_setgroup(sp, thisgroup, thishost)) { + if (cldu_setgroup(sp, thisgroup, thishost, thisname)) { /* Already logged error */ goto err_group; } @@ -626,6 +930,14 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose) newactive = cldu_nextactive(sp); } + if (cldu_set_master(sp) == 0) { + if (debugging) + applog(LOG_DEBUG, "Locked %s", sp->mfname); + tabled_srv.state_want = ST_W_MASTER; + } else { + tabled_srv.state_want = ST_W_SLAVE; + } + retry_cnt = 0; for (;;) { if (!scan_chunks(sp)) @@ -696,8 +1008,12 @@ void cld_end(void) sp->ffname = NULL; free(sp->xfname); sp->xfname = NULL; + free(sp->mfname); + sp->mfname = NULL; free(sp->thisgroup); sp->thisgroup = NULL; free(sp->thishost); sp->thishost = NULL; + free(sp->thisname); + sp->thisname = NULL; } diff --git a/server/config.c b/server/config.c index ff4d876..293a5dd 100644 --- a/server/config.c +++ b/server/config.c @@ -224,6 +224,16 @@ static void cfg_elm_end (GMarkupParseContext *context, cc->text = NULL; } + else if (!strcmp(element_name, "TDBRepName")) { + if (!cc->text) { + applog(LOG_WARNING, "TDBRepName element empty"); + return; + } + free(tabled_srv.rep_name); + tabled_srv.rep_name = cc->text; + cc->text = NULL; + } + else if (!strcmp(element_name, "StatusPort")) { if (!cc->text) { applog(LOG_WARNING, "StatusPort element empty"); diff --git a/server/metarep.c b/server/metarep.c new file mode 100644 index 0000000..d3eec49 --- /dev/null +++ b/server/metarep.c @@ -0,0 +1,1245 @@ + +/* + * Copyright 2008-2009 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#define _GNU_SOURCE +#include "tabled-config.h" + +#include <sys/types.h> +#include <sys/ioctl.h> +#include <stddef.h> +#include <string.h> +#include <stdlib.h> +#include <errno.h> +#include <fcntl.h> +#include <syslog.h> +#include <glib.h> +#include <tdb.h> +#include <netdb.h> +#include <netinet/in.h> +#include "tabled.h" + +/* #define offsetof(type, member) \ + (((unsigned char *)&((type *)0)->member) - (unsigned char *)0) */ + +/* + * flags: + * <31:28> version (currently 1) + * <27:8> unused + * <7:0> rep_msg_type + */ +enum rep_msg_type { REP_MSG_NOP, REP_MSG_LOGIN, REP_MSG_LOGOK, REP_MSG_DATA }; +struct rep_msg_hdr { + unsigned int flags; + unsigned int lenctl; + unsigned int lendata; + unsigned short dst, src; +}; + +/* + * The naming convention is to identify the context in which the function runs. + */ +static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf); + +/* + * Note that the invalid dbid is zero, not -1. + */ +static int make_remote_id(void) +{ + int id; + + for (;;) { + id = rand() % (DBID_MAX+1 - DBID_MIN) + DBID_MIN; + if (!tdb_find_remote_byid(id)) + return id; + } +} + +static int dbl_init(struct db_link *dbl) +{ + dbl->fd = -1; + dbl->state = DBC_INIT; + + dbl->obuflen = 500; + dbl->obuf = malloc(dbl->obuflen); + if (!dbl->obuf) + return -1; + + dbl->ibuflen = sizeof(struct rep_msg_hdr); + dbl->ibuf = malloc(dbl->ibuflen); + if (!dbl->ibuf) { + free(dbl->obuf); + return -1; + } + dbl->cnt = 0; + dbl->explen = 1; + + return 0; +} + +static int dbl_irealloc(struct db_link *dbl, int len) +{ + unsigned char *newbuf; + + if (len > dbl->ibuflen) { + if (!(newbuf = malloc(len))) + return -1; + memcpy(newbuf, dbl->ibuf, dbl->ibuflen); + free(dbl->ibuf); + dbl->ibuf = newbuf; + dbl->ibuflen = len; + } + return 0; +} + +/* + * Expect the dbl->explen, return accumulated dbl->cnt. + */ +static int dbl_expect(struct db_link *dbl) +{ + int rc; + + rc = read(dbl->fd, dbl->ibuf + dbl->cnt, dbl->explen - dbl->cnt); + if (rc < 0) { + if (errno == EAGAIN) + return dbl->cnt; + applog(LOG_ERR, "network read: %s", strerror(errno)); + return -1; + } + if (rc == 0) { + applog(LOG_ERR, "EOF from peer"); /* P3 */ + return -1; + } + dbl->cnt += rc; + return dbl->cnt; +} + +static int dbl_hdr_validate(struct rep_msg_hdr *hdr, int thisid) +{ + unsigned int msgflags; + int srcid, dstid; + + msgflags = GUINT32_FROM_BE(hdr->flags); + if ((msgflags >> 28) != 1) { + applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags); + return -1; + } + + srcid = GUINT16_TO_BE(hdr->src); + dstid = GUINT16_TO_BE(hdr->dst); + if (srcid == dstid) { + applog(LOG_ERR, "Link: loopback, dbid %d", dstid); + return -1; + } + if (srcid < DBID_MIN || srcid > DBID_MAX) { + applog(LOG_ERR, "Link: bad src dbid %d", srcid); + return -1; + } + if (dstid < DBID_MIN || dstid > DBID_MAX) { + applog(LOG_ERR, "Link: bad dst dbid %d", dstid); + return -1; + } + + if (thisid != 0 && dstid != thisid) { + applog(LOG_ERR, "Link: misdirected, dst dbid %d our dbid %d", + dstid, thisid); + return -1; + } + + return 0; +} + +/* + * Login message is different in two ways: + * - src and/or dst may be not set + * - lenctl is the dst name, lendata is the src name + * (but contents not available for validation) + */ +static int dbl_hdr_validate_login(struct rep_msg_hdr *hdr, int thisid) +{ + unsigned int msgflags; + unsigned int len; + + msgflags = GUINT32_FROM_BE(hdr->flags); + if ((msgflags >> 28) != 1) { + applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags); + return -1; + } + if ((msgflags & 0xff) != REP_MSG_LOGIN) { + applog(LOG_ERR, "Link: bad login request, flags 0x%08x", + msgflags); + return -1; + } + +#if 0 /* A bad idea as long as names are the persistent identifiers. */ + int dstid; + dstid = GUINT32_FROM_BE(hdr->dst); + if (dstid && dstid != thisid) { + applog(LOG_ERR, "Link: login to wrong dbid %d", dstid); + return -1; + } +#endif + + len = GUINT32_FROM_BE(hdr->lenctl); + if (len == 0 || len > 64) { + applog(LOG_ERR, "Link: bad login dst len %u", len); + return -1; + } + + len = GUINT32_FROM_BE(hdr->lendata); + if (len == 0 || len > 64) { + applog(LOG_ERR, "Link: bad login src len %u", len); + return -1; + } + + return 0; +} + +static void dbl_fini(struct db_link *dbl) +{ + if (dbl->writing) { + event_del(&dbl->wrev); + dbl->writing = false; + } + if (dbl->fd >= 0) { + event_del(&dbl->rcev); + close(dbl->fd); + } + if (dbl->ibuf) + free(dbl->ibuf); + if (dbl->obuf) + free(dbl->obuf); +} + +static struct db_conn *tdb_find_byid(struct tablerep *rtdb, int id) +{ + struct db_conn *dbc; + + list_for_each_entry(dbc, &rtdb->conns, link) { + if (dbc->remote && dbc->remote->dbid == id) + return dbc; + } + return NULL; +} + +static struct db_conn *dbc_alloc(struct tablerep *rtdb, struct db_remote *rem) +{ + struct db_conn *dbc; + + dbc = malloc(sizeof(*dbc)); + if (!dbc) + goto out_mem; + memset(dbc, 0, sizeof(*dbc)); + dbc->rtdb = rtdb; + dbc->remote = rem; + if (dbl_init(&dbc->lk)) + goto out_dbl; + return dbc; + + out_dbl: + free(dbc); + out_mem: + return NULL; +} + +static void dbc_free(struct db_conn *dbc) +{ + dbl_fini(&dbc->lk); + free(dbc); +} + +/* + * The dbc->remote is known here, see callers. + * + * The db4 code assumes that it is all right to block when sending. Of course + * in our case that means blocking the whole (single-threaded) server. + * It is also all right to drop messages, which is said to hurt performance + * in other ways. Still, as long as tabled is single-theaded we have no choice. + * + * Since we can only send complete messages, and even blocking sockets can + * return short writes, we must buffer output. But we do not create any + * additional queues beyond what is required for the atomicity. + */ +static int tdb_rep_send(struct tablerep *rtdb, struct db_link *dbl, + int dstid, const DBT *ctl, const DBT *rec, + bool easydrop) +{ + unsigned char *p; + struct rep_msg_hdr *hdr; + unsigned int msgflags; + ssize_t len; + ssize_t rc; + + if (dbl->togo) { + /* Maybe poke the output here? Should not be necessary. */ + return 1; + } + + len = sizeof(struct rep_msg_hdr) + ctl->size + rec->size; + if (dbl->obuflen < len) { + free(dbl->obuf); + dbl->obuflen = 0; + dbl->obuf = malloc(len); + if (!dbl->obuf) { + applog(LOG_WARNING, "No core (%ld)", (long) len); + return -1; + } + dbl->obuflen = len; + } + + hdr = (struct rep_msg_hdr *) dbl->obuf; + p = dbl->obuf; + + memset(hdr, 0, sizeof(struct rep_msg_hdr)); + msgflags = (1 << 28) | (REP_MSG_DATA); + hdr->flags = GUINT32_TO_BE(msgflags); + hdr->dst = GUINT16_TO_BE((unsigned short)dstid); + hdr->src = GUINT16_TO_BE((unsigned short)rtdb->thisid); + p += sizeof(struct rep_msg_hdr); + if (ctl->size) { + hdr->lenctl = GUINT32_TO_BE(ctl->size); + memcpy(p, ctl->data, ctl->size); + p += ctl->size; + } + if (rec->size) { + hdr->lendata = GUINT32_TO_BE(rec->size); + memcpy(p, rec->data, rec->size); + p += rec->size; + } + + dbl->done = 0; + dbl->togo = p - dbl->obuf; + + rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo); + if (rc < 0) { + dbl->done = 0; + dbl->togo = 0; + applog(LOG_ERR, "socket write error, peer dbid %d: %s", + dstid, strerror(errno)); + return -1; + } + if (rc < dbl->togo) { + if (!dbl->writing) { + if (event_add(&dbl->wrev, NULL)) + applog(LOG_ERR, "event_add failed (write)"); + else + dbl->writing = true; + } + } + dbl->done += rc; + dbl->togo -= rc; + return 0; +} + +static int db4_rep_send(DB_ENV *dbenv, const DBT *ctl, const DBT *rec, + const DB_LSN *lsnp, int envid, uint32_t flags) +{ + struct tablerep *rtdb; + struct db_conn *dbc; + int cnt; + int rc; + + rtdb = (struct tablerep *) + ((char *)dbenv->app_private - offsetof(struct tablerep, tdb)); + + if (envid == DB_EID_BROADCAST) { + cnt = 0; + list_for_each_entry(dbc, &rtdb->conns, link) { + if (dbc->lk.state == DBC_OPEN) { + rc = tdb_rep_send(rtdb, &dbc->lk, + dbc->remote->dbid, + ctl, rec, true); + if (!rc) + cnt++; + if (rc < 0) + dbc->lk.state = DBC_DEAD; + } + } + if (!cnt) + return DB_REP_UNAVAIL; + } else { + dbc = tdb_find_byid(rtdb, envid); + if (dbc && dbc->lk.state == DBC_OPEN) { + rc = tdb_rep_send(rtdb, &dbc->lk, + dbc->remote->dbid, ctl, rec, false); + if (rc < 0) { + dbc->lk.state = DBC_DEAD; + return DB_REP_UNAVAIL; + } + if (rc) + return DB_REP_UNAVAIL; + } else { + applog(LOG_INFO, "Send: dbid %d not found", envid); + return DB_REP_UNAVAIL; + } + } + return 0; +} + +static int rtdb_process(struct db_conn *dbc, unsigned char *msgbuf) +{ + struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf; + DB_ENV *dbenv = dbc->rtdb->tdb.env; + DBT pctl, prec; + DB_LSN lsn; + struct db_remote *peer; + int rc; + + peer = tdb_find_remote_byid(GUINT16_FROM_BE(hdr->src)); + if (!peer) { + applog(LOG_INFO, "Unknown peer dbid %d", + GUINT16_FROM_BE(hdr->src)); + return -1; + } + + memset(&pctl, 0, sizeof(pctl)); + pctl.data = msgbuf + sizeof(struct rep_msg_hdr); + pctl.size = GUINT32_FROM_BE(hdr->lenctl); + memset(&prec, 0, sizeof(prec)); + prec.data = pctl.data + pctl.size; + prec.size = GUINT32_FROM_BE(hdr->lendata); + rc = dbenv->rep_process_message(dbenv, &pctl, &prec, peer->dbid, &lsn); + switch (rc) { + case DB_REP_ISPERM: + /* + * The "record is written" is normal in db4 operations, + * and shows up so much that we do not print it even under + * if (debugging). + */ + break; + case DB_REP_DUPMASTER: /* DB thinks we have 2 */ + case DB_REP_HANDLE_DEAD: /* what handle? */ + case DB_REP_HOLDELECTION: /* maybe just rep_init it */ + case DB_REP_IGNORE: /* well, whatever */ + case DB_REP_JOIN_FAILURE: + case DB_REP_LEASE_EXPIRED: + case DB_REP_LOCKOUT: + case DB_REP_NEWSITE: + case DB_REP_NOTPERM: + case DB_REP_UNAVAIL: + default: + if (rc) { + applog(LOG_INFO, "rep_process_message: %d (%s)", + rc, db_strerror(rc)); + } + } + + return 0; +} + +static int rtdb_send_more(struct db_link *dbl) +{ + ssize_t rc; + + if (!dbl->togo) { + /* P3 */ applog(LOG_INFO, "stray write event"); + event_del(&dbl->wrev); + dbl->writing = false; + return 0; + } + + rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo); + if (rc < 0) { + applog(LOG_ERR, "socket write error: %s", strerror(errno)); + dbl->done = 0; + dbl->togo = 0; + return -1; + } + if (rc < dbl->togo) { + dbl->done += rc; + dbl->togo -= rc; + if (!dbl->writing) { + if (event_add(&dbl->wrev, NULL)) + applog(LOG_ERR, "event_add failed (write)"); + else + dbl->writing = true; + } + } else { + dbl->done = 0; + dbl->togo = 0; + if (dbl->writing) { + event_del(&dbl->wrev); + dbl->writing = false; + } + } + return 0; +} + +static void rtdb_wr_event(int fd, short events, void *userdata) +{ + struct db_link *dbl = userdata; + + if (rtdb_send_more(dbl)) + dbl->state = DBC_DEAD; +} + +static void rtdb_master_tcp_event(int fd, short events, void *userdata) +{ + struct db_conn *dbc = userdata; + struct rep_msg_hdr *hdr; + unsigned msgflags; + int ctllen, reclen; + int len; + int rc; + + switch (dbc->lk.state) { + case DBC_LOGIN: + rc = dbl_expect(&dbc->lk); + if (rc < 0) + goto out_bad_dbc; + if (rc < dbc->lk.explen) + return; + + if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) { + hdr = (struct rep_msg_hdr *) dbc->lk.ibuf; + if (dbl_hdr_validate_login(hdr, dbc->rtdb->thisid)) + goto out_bad_dbc; + + ctllen = GUINT32_FROM_BE(hdr->lenctl); + reclen = GUINT32_FROM_BE(hdr->lendata); + len = sizeof(struct rep_msg_hdr) + ctllen + reclen; + if (dbl_irealloc(&dbc->lk, len) < 0) { + applog(LOG_ERR, "No core (%d)", len); + goto out_bad_dbc; + } + dbc->lk.explen = len; + } else { + if (rtdb_master_login_reply(dbc, dbc->lk.ibuf)) + goto out_bad_dbc; + + dbc->lk.state = DBC_OPEN; + dbc->lk.cnt = 0; + dbc->lk.explen = sizeof(struct rep_msg_hdr); + } + break; + case DBC_OPEN: + rc = dbl_expect(&dbc->lk); + if (rc < 0) + goto out_bad_dbc; + if (rc < dbc->lk.explen) + return; + + if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) { + hdr = (struct rep_msg_hdr *) dbc->lk.ibuf; + if (dbl_hdr_validate(hdr, dbc->rtdb->thisid)) + goto out_bad_dbc; + msgflags = GUINT32_FROM_BE(hdr->flags); + if ((msgflags & 0xff) != REP_MSG_DATA) { + applog(LOG_ERR, + "Bad data message, flags 0x%08x", + msgflags); + goto out_bad_dbc; + } + + ctllen = GUINT32_FROM_BE(hdr->lenctl); + reclen = GUINT32_FROM_BE(hdr->lendata); + len = sizeof(struct rep_msg_hdr) + ctllen + reclen; + if (dbl_irealloc(&dbc->lk, len) < 0) { + applog(LOG_ERR, "No core (%d)", len); + goto out_bad_dbc; + } + dbc->lk.explen = len; + } else { + if (rtdb_process(dbc, dbc->lk.ibuf)) + goto out_bad_dbc; + + dbc->lk.state = DBC_OPEN; + dbc->lk.cnt = 0; + dbc->lk.explen = sizeof(struct rep_msg_hdr); + } + break; + default: // DBC_DEAD + if (dbc->remote) { + applog(LOG_INFO, + "Event on a dead slave socket, slave %s", + dbc->remote->host); + } else { + applog(LOG_INFO, + "Event on a dead slave socket"); + } + tdb_conn_scrub_cb(); + } + return; + + out_bad_dbc: + dbc->lk.state = DBC_DEAD; + tdb_conn_scrub_cb(); + return; +} + +static void tdb_conn_event(int fd, short events, void *userdata) +{ + struct tablerep *rtdb = userdata; + struct db_conn *dbc; + struct sockaddr_in6 addr; + socklen_t addrlen; + char host[65], port[15]; + + dbc = dbc_alloc(rtdb, NULL); + if (!dbc) + goto out_dbc; + dbc->lk.explen = sizeof(struct rep_msg_hdr); + dbc->lk.state = DBC_LOGIN; + + addrlen = sizeof(addr); + dbc->lk.fd = accept(fd, (struct sockaddr *) &addr, &addrlen); + if (dbc->lk.fd < 0) { + applog(LOG_ERR, "accept: %s", strerror(errno)); + goto out_accept; + } + + getnameinfo((struct sockaddr *) &addr, addrlen, + host, sizeof(host), port, sizeof(port), + NI_NUMERICHOST|NI_NUMERICSERV); + applog(LOG_INFO, "db slave host %s port %s", host, port); + + if (fcntl(dbc->lk.fd, F_SETFL, O_NONBLOCK) < 0) { + applog(LOG_ERR, "fcntl: %s", strerror(errno)); + goto out_flags; + } + + event_set(&dbc->lk.rcev, dbc->lk.fd, EV_READ | EV_PERSIST, + rtdb_master_tcp_event, dbc); + event_set(&dbc->lk.wrev, dbc->lk.fd, EV_WRITE | EV_PERSIST, + rtdb_wr_event, &dbc->lk); + if (event_add(&dbc->lk.rcev, NULL) < 0) { + applog(LOG_ERR, "event_add failed"); + goto out_add; + } + list_add_tail(&dbc->link, &rtdb->conns); + return; + + out_add: + out_flags: + close(dbc->lk.fd); + out_accept: + dbc_free(dbc); + out_dbc: + return; +} + +static int tdb_rep_listen_open(struct sockaddr_in *addr, int addr_len) +{ + int fd; + int on; + int rc; + + fd = socket(addr->sin_family, SOCK_STREAM, 0); + if (fd < 0) + return -errno; + + on = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { + rc = -errno; + goto out_err; + } + + if (bind(fd, addr, addr_len) < 0) { + rc = -errno; + goto out_err; + } + + // rc = fsetflags("tcp server", fd, O_NONBLOCK); + // if (rc) { + // rc = -errno; + // goto out_err; + // } + + if (listen(fd, 100) < 0) { + rc = -errno; + goto out_err; + } + + return fd; + + out_err: + close(fd); + return rc; +} + +static int rtdb_rep_listen(struct tablerep *rtdb, unsigned short port) +{ + struct sockaddr_in addr4; + struct sockaddr_in6 addr6; + int rc; + + memset(&addr6, 0, sizeof(addr6)); + addr6.sin6_family = AF_INET6; + addr6.sin6_port = htons(port); + memcpy(&addr6.sin6_addr, &in6addr_any, sizeof(struct in6_addr)); + rc = tdb_rep_listen_open((struct sockaddr_in *)&addr6, sizeof(addr6)); + if (rc < 0) { + if (debugging) + applog(LOG_DEBUG, + "tdb_rep_listen_open(v6, %u) failed: %s", + port, strerror(-rc)); + } else { + rtdb->sockfd6 = rc; + event_set(&rtdb->lsev6, rtdb->sockfd6, EV_READ | EV_PERSIST, + tdb_conn_event, rtdb); + if (event_add(&rtdb->lsev6, NULL) < 0) + applog(LOG_ERR, "event_add failed"); + } + + memset(&addr4, 0, sizeof(addr4)); + addr4.sin_family = AF_INET; + addr4.sin_port = htons(port); + addr4.sin_addr.s_addr = htonl(INADDR_ANY); + rc = tdb_rep_listen_open((struct sockaddr_in *)&addr4, sizeof(addr4)); + if (rc < 0) { + if (debugging) + applog(LOG_DEBUG, + "tdb_rep_listen_open(v4, %u) failed: %s", + port, strerror(-rc)); + } else { + rtdb->sockfd4 = rc; + event_set(&rtdb->lsev4, rtdb->sockfd4, EV_READ | EV_PERSIST, + tdb_conn_event, rtdb); + if (event_add(&rtdb->lsev4, NULL) < 0) + applog(LOG_ERR, "event_add failed"); + } + + return 0; +} + +static void rtdb_slave_tcp_event(int fd, short events, void *userdata) +{ + struct db_conn *dbc = userdata; + struct tablerep *rtdb = dbc->rtdb; + struct rep_msg_hdr *hdr; + unsigned msgflags; + int srcid, dstid; + int ctllen, reclen; + int len; + int rc; + + switch (dbc->lk.state) { + case DBC_LOGIN: + rc = dbl_expect(&dbc->lk); + if (rc < 0) + goto out_bad_dbc; + if (rc < dbc->lk.explen) + return; + + hdr = (struct rep_msg_hdr *) dbc->lk.ibuf; + if (dbl_hdr_validate(hdr, rtdb->thisid)) + goto out_bad_dbc; + msgflags = GUINT32_FROM_BE(hdr->flags); + if ((msgflags & 0xff) != REP_MSG_LOGOK) { + applog(LOG_ERR, "Bad login reply, flags 0x%08x", + msgflags); + goto out_bad_dbc; + } + srcid = GUINT16_FROM_BE(hdr->src); + dstid = GUINT16_FROM_BE(hdr->dst); + + if (rtdb->thisid == 0) { + applog(LOG_INFO, "Assigned local dbid %d", dstid); + } else { + if (rtdb->thisid != dstid) { + /* + * Oracle people posted that db won't like this, + * but what can we do. At worst, blow away the + * local db on slave by hand and let it resync. + */ + applog(LOG_INFO, + "Reassigned local dbid from %d to %d", + rtdb->thisid, dstid); + rtdb->thisid = dstid; + } + } + rtdb->thisid = dstid; + + dbc->lk.state = DBC_OPEN; + dbc->lk.cnt = 0; + dbc->lk.explen = sizeof(struct rep_msg_hdr); + + if (tdb_slave_login_cb(srcid)) + goto out_bad_dbc; + break; + case DBC_OPEN: + rc = dbl_expect(&dbc->lk); + if (rc < 0) + goto out_bad_dbc; + if (rc < dbc->lk.explen) + return; + + if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) { + hdr = (struct rep_msg_hdr *) dbc->lk.ibuf; + if (dbl_hdr_validate(hdr, rtdb->thisid)) + goto out_bad_dbc; + msgflags = GUINT32_FROM_BE(hdr->flags); + if ((msgflags & 0xff) != REP_MSG_DATA) { + applog(LOG_ERR, + "Bad data message, flags 0x%08x", + msgflags); + goto out_bad_dbc; + } + + ctllen = GUINT32_FROM_BE(hdr->lenctl); + reclen = GUINT32_FROM_BE(hdr->lendata); + len = sizeof(struct rep_msg_hdr) + ctllen + reclen; + if (dbl_irealloc(&dbc->lk, len) < 0) { + applog(LOG_ERR, "No core (%d)", len); + goto out_bad_dbc; + } + dbc->lk.explen = len; + } else { + if (rtdb_process(dbc, dbc->lk.ibuf)) + goto out_bad_dbc; + + dbc->lk.state = DBC_OPEN; + dbc->lk.cnt = 0; + dbc->lk.explen = sizeof(struct rep_msg_hdr); + } + break; + case DBC_DEAD: + tdb_slave_disc_cb(); + break; + default: + /* P3 */ applog(LOG_INFO, "Event on a unready socket"); + } + return; + + out_bad_dbc: + dbc->lk.state = DBC_DEAD; + tdb_conn_scrub_cb(); + return; +} + +static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf) +{ + struct tablerep *rtdb = dbc->rtdb; + struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf; + int srclen, dstlen; + char *srcname, *dstname; + struct db_conn *tmp; + struct db_remote *slave; + int newid; + struct rep_msg_hdr hdrb; + unsigned int msgflags; + int rc; + + /* + * Before proceeding, extract and zero-terminate src and dst names. + */ + dstlen = GUINT32_FROM_BE(hdr->lenctl); + dstname = malloc(dstlen + 1); + if (!dstname) { + applog(LOG_ERR, "No core"); + return -1; + } + memcpy(dstname, msgbuf + sizeof(struct rep_msg_hdr), dstlen); + dstname[dstlen] = 0; + + srclen = GUINT32_FROM_BE(hdr->lendata); + srcname = malloc(srclen + 1); + if (!srcname) { + applog(LOG_ERR, "No core"); + free(dstname); + return -1; + } + memcpy(srcname, msgbuf + sizeof(struct rep_msg_hdr) + dstlen, srclen); + srcname[srclen] = 0; + + if (dbc->remote) { + /* Never happens even with bad clients, our internal problem. */ + applog(LOG_ERR, "Redone login for slave %s (src %s)", + dbc->remote->host, srcname); + goto out_err; + } + + if (strcmp(srcname, rtdb->thisname) == 0) { + applog(LOG_ERR, "Login from aliasing slave %s", srcname); + goto out_err; + } + + slave = tdb_find_remote_byname(srcname); + if (!slave) { + applog(LOG_INFO, "Unknown slave \"%s\"", srcname); + goto out_err; + } + + if (slave->dbid == DBID_NONE) { + newid = GUINT16_FROM_BE(hdr->src); + if (newid == 0 || newid < DBID_MIN || newid > DBID_MAX) { + newid = make_remote_id(); + } + slave->dbid = newid; + } + if (debugging) + applog(LOG_DEBUG, "Link login, slave %s dbid %d", + slave->host, slave->dbid); + + /* + * Dispose of all existing connections. Our current implementation + * provides no security, so it is a proper thing to do. We assume + * that the slave knows what it's doing, maybe it detected a loss + * of TCP connection that we missed. + */ + list_for_each_entry(tmp, &rtdb->conns, link) { + if (tmp->remote == slave) + tmp->lk.state = DBC_DEAD; + } + + dbc->remote = slave; + + memset(&hdrb, 0, sizeof(hdrb)); + msgflags = (1 << 28) | (REP_MSG_LOGOK); + hdrb.flags = GUINT32_TO_BE(msgflags); + hdrb.dst = GUINT16_TO_BE((unsigned short)slave->dbid); + hdrb.src = GUINT16_TO_BE((unsigned short)rtdb->thisid); + + rc = write(dbc->lk.fd, &hdrb, sizeof(hdrb)); + if (rc < 0) { + applog(LOG_INFO, "Write error to peer %s: %s", slave->host, + strerror(errno)); + goto out_err; + } + if (rc < sizeof(hdrb)) { + applog(LOG_INFO, "Write short to peer %s: %d", slave->host, rc); + goto out_err; + } + + return 0; + + out_err: + free(srcname); + free(dstname); + return -1; +} + +static int rtdb_slave_login(struct db_conn *dbc) +{ + struct rep_msg_hdr *hdr; + unsigned char *msgbuf; + unsigned int msgflags; + int dstlen, srclen; + int len; + + dstlen = strlen(dbc->remote->host); + srclen = strlen(dbc->rtdb->thisname); + len = sizeof(struct rep_msg_hdr) + dstlen + srclen; + msgbuf = malloc(len); + if (!msgbuf) + return -1; + + hdr = (struct rep_msg_hdr *) msgbuf; + // memset(hdr, 0, sizeof(struct rep_msg_hdr)); /* no holes */ + msgflags = (1 << 28) | (REP_MSG_LOGIN); + hdr->flags = GUINT32_TO_BE(msgflags); + hdr->lenctl = GUINT32_TO_BE(dstlen); + hdr->lendata = GUINT32_TO_BE(srclen); + hdr->dst = GUINT16_TO_BE((unsigned short)dbc->remote->dbid); + hdr->src = GUINT16_TO_BE((unsigned short)dbc->rtdb->thisid); + memcpy(msgbuf + sizeof(struct rep_msg_hdr), dbc->remote->host, dstlen); + memcpy(msgbuf + sizeof(struct rep_msg_hdr) + dstlen, + dbc->rtdb->thisname, srclen); + + if (write(dbc->lk.fd, msgbuf, len) < len) { + dbc->lk.state = DBC_DEAD; + free(msgbuf); + return -1; + } + dbc->lk.state = DBC_LOGIN; + dbc->lk.explen = sizeof(struct rep_msg_hdr); + dbc->lk.cnt = 0; + free(msgbuf); + return 0; +} + +static int tdb_rep_resolve(struct tablerep *rtdb, int *family, + int addrsize, unsigned char *addr, int *addrlen, + const char *hostname, unsigned short port) +{ + char portstr[15]; + struct addrinfo hints; + struct addrinfo *res, *res0; + int rc; + + snprintf(portstr, sizeof(portstr), "%u", port); + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + + rc = getaddrinfo(hostname, portstr, &hints, &res0); + if (rc) { + applog(LOG_WARNING, "getaddrinfo(%s:%s) failed: %s", + hostname, portstr, gai_strerror(rc)); + return -1; + } + + for (res = res0; res; res = res->ai_next) { + if (res->ai_family != AF_INET && res->ai_family != AF_INET6) + continue; + + if (res->ai_addrlen > addrsize) /* should not happen */ + continue; + + memcpy(addr, res->ai_addr, res->ai_addrlen); + *addrlen = res->ai_addrlen; + *family = res->ai_family; + + freeaddrinfo(res0); + return 0; + } + + freeaddrinfo(res0); + + applog(LOG_WARNING, "getaddrinfo(%s:%s): nothing suitable", + hostname, portstr); + return -1; +} + +static int rtdb_rep_connect(struct db_conn *dbc) +{ + struct db_link *dbl = &dbc->lk; + struct db_remote *master = dbc->remote; + int family; + unsigned char addr[32]; + int addrlen; + int rc; + + rc = tdb_rep_resolve(dbc->rtdb, &family, sizeof(addr), addr, &addrlen, + master->host, master->port); + if (rc < 0) + return -1; + + rc = socket(family, SOCK_STREAM, 0); + if (rc < 0) { + applog(LOG_WARNING, "socket: %s", strerror(errno)); + return -1; + } + dbl->fd = rc; + + if (connect(dbl->fd, (struct sockaddr *)addr, addrlen)) { + applog(LOG_WARNING, "connect(host %s port %u): %s", + master->host, master->port, strerror(errno)); + close(dbl->fd); + return -1; + } + + if (fcntl(dbl->fd, F_SETFL, O_NONBLOCK) < 0) { + applog(LOG_ERR, "fcntl: %s", strerror(errno)); + close(dbl->fd); + return -1; + } + + event_set(&dbl->rcev, dbl->fd, EV_READ | EV_PERSIST, + rtdb_slave_tcp_event, dbc); + if (event_add(&dbl->rcev, NULL) < 0) { + applog(LOG_ERR, "event_add failed"); + close(dbl->fd); + return -1; + } + event_set(&dbl->wrev, dbl->fd, EV_WRITE | EV_PERSIST, + rtdb_wr_event, dbl); + return 0; +} + +static void __rtdb_fini(struct tablerep *rtdb) +{ + struct db_conn *dbc; + + if (rtdb->sockfd4 >= 0) { + event_del(&rtdb->lsev4); + close(rtdb->sockfd4); + rtdb->sockfd4 = -1; + } + if (rtdb->sockfd6 >= 0) { + event_del(&rtdb->lsev6); + close(rtdb->sockfd6); + rtdb->sockfd6 = -1; + } + + while (!list_empty(&rtdb->conns)) { + dbc = list_entry(rtdb->conns.next, struct db_conn, link); + list_del(&dbc->link); + dbc_free(dbc); + } + rtdb->mdbc = NULL; +} + +/* + * return: + * -1 - there was an error, things are in disarray, must call __rtdb_fini. + * 0 - all is up, may call tdb_init if desired. + * 1 - not done yet, just return to dispatch. + */ +static int __rtdb_start(struct tablerep *rtdb, bool we_are_master, + struct db_remote *rep_master, unsigned short rep_port) +{ + struct db_conn *dbc; + + if (we_are_master) { + if (rtdb->thisid == DBID_NONE) + rtdb->thisid = make_remote_id(); + if (rtdb_rep_listen(rtdb, rep_port)) + return -1; + } else { + if (!rep_master) { + applog(LOG_INFO, "No master yet"); /* P3 */ + return -1; + } + if (!rtdb->mdbc) { + dbc = dbc_alloc(rtdb, rep_master); + if (!dbc) + return -1; + dbc->lk.explen = sizeof(struct rep_msg_hdr); + dbc->lk.state = DBC_INIT; + list_add_tail(&dbc->link, &rtdb->conns); + rtdb->mdbc = dbc; + } + switch (rtdb->mdbc->lk.state) { + case DBC_OPEN: + break; + case DBC_INIT: + if (rtdb_rep_connect(rtdb->mdbc)) + return -1; + if (rtdb_slave_login(rtdb->mdbc)) + return -1; + return 1; + case DBC_LOGIN: + /* P3 */ applog(LOG_INFO, "start: no answer"); + return -1; + default: + /* P3 */ applog(LOG_INFO, "start: confusion (state %d)", + rtdb->mdbc->lk.state); + return -1; + } + } + return 0; +} + +int rtdb_init(struct tablerep *rtdb, const char *thisname) +{ + rtdb->thisname = thisname; + + INIT_LIST_HEAD(&rtdb->conns); + rtdb->sockfd4 = -1; + rtdb->sockfd6 = -1; + + // rtdb->mdbc = dbc_alloc(rtdb, NULL); + // if (!rtdb->mdbc) + // return -1; + // rtdb->mdbc->lk.explen = sizeof(struct rep_msg_hdr); + // rtdb->mdbc->lk.state = DBC_INIT; + // list_add_tail(&rtdb->mdbc.link, &rtdb->conns); + return 0; +} + +int rtdb_start(struct tablerep *rtdb, + const char *db_home, + bool we_are_master, + struct db_remote *rep_master, unsigned short rep_port, + void (*cb)(enum db_event)) +{ + int rc; + + rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port); + if (rc < 0) + goto err_out; + if (rc > 0) + return 0; + + /* + * Note that we only get here if either we're master, or slave + * and link is DBC_OPEN. In both cases rtdb->thidid must be set. + */ + if (rtdb->thisid == 0) { /* never happens */ + applog(LOG_WARNING, "Zero own dbid, master %d", we_are_master); + goto err_out; + } + if (tdb_init(&rtdb->tdb, db_home, NULL, "tabled", true, + rtdb->thisid, db4_rep_send, we_are_master, cb)) { + goto err_out; + } + return 0; + +err_out: + __rtdb_fini(rtdb); + return -1; +} + +void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master, + struct db_remote *rep_master, unsigned short rep_port) +{ + int rc; + + __rtdb_fini(rtdb); + rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port); + if (rc < 0) { + /* + * If we failed to reconnect immediately, we do not retry. + * This is because db4 has its own timeouts, so there's really + * no point in doing anything else: we would only interfere. + * From now on, rely on CLD to drive the attempts to reconnect. + */ + /* P3 */ applog(LOG_INFO, "failed to reconnect (%d)", rc); + } +} + +void rtdb_dbc_scrub(struct tablerep *rtdb) +{ + struct db_conn *dbc, *tmp; + + list_for_each_entry_safe(dbc, tmp, &rtdb->conns, link) { + if (dbc->lk.state == DBC_DEAD) { + /* + * This prinout is misleading, since every remote + * may have several connections. But how to fix it? + */ + if (dbc->remote) { + applog(LOG_INFO, "Closing, peer %s", + dbc->remote->host); + } else { + applog(LOG_INFO, "Closing"); + } + if (dbc == rtdb->mdbc) + rtdb->mdbc = NULL; + list_del(&dbc->link); + dbc_free(dbc); + } + } +} + +/* + * This wants to be both in here and in tdb.c. Problem. + */ +int rtdb_restart(struct tablerep *rtdb, bool we_are_master) +{ + DB_ENV *dbenv = rtdb->tdb.env; + unsigned int rep_flags; + int rc; + + rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT; + rc = dbenv->rep_start(dbenv, NULL, rep_flags); + if (rc) { + dbenv->err(dbenv, rc, "rep_start(0x%x)", rep_flags); + return -1; + } + return 0; +} + +void rtdb_fini(struct tablerep *rtdb) +{ + __rtdb_fini(rtdb); + tdb_fini(&rtdb->tdb); +} + diff --git a/server/object.c b/server/object.c index f8e7b12..3801e94 100644 --- a/server/object.c +++ b/server/object.c @@ -39,7 +39,7 @@ static int object_find(DB_TXN *txn, const char *bucket, const char *key, struct db_obj_ent *pobj) { - DB *objs = tdb.objs; + DB *objs = tdbrep.tdb.objs; struct db_obj_key *okey; size_t alloc_len; DBT pkey, pval; @@ -72,7 +72,7 @@ static int object_find(DB_TXN *txn, const char *bucket, const char *key, static bool __object_del(DB_TXN *txn, const char *bucket, const char *key) { - DB *objs = tdb.objs; + DB *objs = tdbrep.tdb.objs; struct db_obj_key *okey; size_t okey_len; DBT pkey; @@ -100,7 +100,7 @@ static bool __object_del(DB_TXN *txn, const char *bucket, const char *key) bool object_del_acls(DB_TXN *txn, const char *bucket, const char *key) { - DB *acls = tdb.acls; + DB *acls = tdbrep.tdb.acls; struct db_acl_key *akey; size_t alloc_len; DBT pkey; @@ -163,8 +163,8 @@ bool object_del(struct client *cli, const char *user, int rc; enum errcode err = InternalError; size_t alloc_len; - DB_ENV *dbenv = tdb.env; - DB *objs = tdb.objs; + DB_ENV *dbenv = tdbrep.tdb.env; + DB *objs = tdbrep.tdb.objs; struct db_obj_key *okey; struct db_obj_ent obje; DBT pkey, pval; @@ -326,9 +326,9 @@ static bool object_put_end(struct client *cli) struct db_obj_ent oldobj; bool delobj; size_t alloc_len; - DB_ENV *dbenv = tdb.env; + DB_ENV *dbenv = tdbrep.tdb.env; DBT pkey, pval; - DB *objs = tdb.objs; + DB *objs = tdbrep.tdb.objs; DB_TXN *txn = NULL; GByteArray *string_data; GArray *string_lens; @@ -786,7 +786,7 @@ static bool object_put_body(struct client *cli, const char *user, return cli_err(cli, InternalError); } - objid = objid_next(&tabled_srv.object_count, &tdb); + objid = objid_next(&tabled_srv.object_count, &tdbrep.tdb); rc = open_chunks(&cli->out_ch, &tabled_srv.all_stor, cli, objid, content_len); @@ -865,9 +865,9 @@ static bool object_put_acls(struct client *cli, const char *user, { enum errcode err = InternalError; enum ReqACLC canacl; - DB_ENV *dbenv = tdb.env; + DB_ENV *dbenv = tdbrep.tdb.env; DB_TXN *txn = NULL; - DB *objs = tdb.objs; + DB *objs = tdbrep.tdb.objs; char *hdr; char timestr[64]; int rc; @@ -1130,7 +1130,7 @@ static bool object_get_body(struct client *cli, const char *user, bool access_ok, modified = true; GString *extra_hdr; size_t alloc_len; - DB *objs = tdb.objs; + DB *objs = tdbrep.tdb.objs; struct db_obj_key *okey; struct db_obj_ent *obj = NULL; DBT pkey, pval; diff --git a/server/replica.c b/server/replica.c index ac14cb2..1b5e832 100644 --- a/server/replica.c +++ b/server/replica.c @@ -612,8 +612,8 @@ static void rep_scan_verify(struct rep_arg *arg, static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid) { - DB_ENV *db_env = tdb.env; - DB *db_objs = tdb.objs; + DB_ENV *db_env = tdbrep.tdb.env; + DB *db_objs = tdbrep.tdb.objs; DB_TXN *db_txn; DBT pkey, pval; struct db_obj_ent *obj; @@ -749,8 +749,8 @@ static void rep_scan(struct rep_arg *arg) g_mutex_unlock(kscan_mutex); memset(&cur, 0, sizeof(struct cursor)); /* enough to construct */ - cur.db_env = tdb.env; - cur.db_objs = tdb.objs; + cur.db_env = tdbrep.tdb.env; + cur.db_objs = tdbrep.tdb.objs; kcnt = 0; for (;;) { diff --git a/server/server.c b/server/server.c index 814afec..8859847 100644 --- a/server/server.c +++ b/server/server.c @@ -97,12 +97,15 @@ struct server tabled_srv = { .config = "/etc/tabled.conf", }; -struct tabledb tdb; +struct tablerep tdbrep; enum { TT_CMD_DUMP, TT_CMD_TDBST_MASTER, - TT_CMD_TDBST_SLAVE + TT_CMD_TDBST_SLAVE, + TT_CMD_MASTER_LINK_RESET, + TT_CMD_LINK_SCRUB, + TT_CMDNUM }; struct compiled_pat patterns[] = { @@ -114,7 +117,11 @@ struct compiled_pat patterns[] = { }; static char *state_name_tdb[ST_TDBNUM] = { - "Init", "Open", "Active", "Master", "Slave" + "Init", "Open", "Master", "Slave" +}; + +static char *cmd_name_tdb[TT_CMDNUM] = { + "Dump", "GoMaster", "GoSlave", "MasterLinkReset", "LinkScrub" }; static struct { @@ -340,7 +347,7 @@ static int authcheck(struct http_req *req, char *extra_bucket, * not match. */ - rc = tdb.passwd->get(tdb.passwd, NULL, &key, &val, 0); + rc = tdbrep.tdb.passwd->get(tdbrep.tdb.passwd, NULL, &key, &val, 0); if (rc) { pass = strdup(""); @@ -350,7 +357,7 @@ static int authcheck(struct http_req *req, char *extra_bucket, char s[64]; snprintf(s, 64, "get user '%s'", user); - tdb.passwd->err(tdb.passwd, rc, s); + tdbrep.tdb.passwd->err(tdbrep.tdb.passwd, rc, s); } } else { pass = val.data; @@ -387,8 +394,22 @@ static void stats_signal(int signo) static void stats_dump(void) { - applog(LOG_INFO, "STATE: TDB %s", - state_name_tdb[tabled_srv.state_tdb]); + struct db_remote *rp; + GList *tmp; + + applog(LOG_INFO, "TDB: group %s state %s host %s rep_port %d dbid %d%s", + tabled_srv.group, state_name_tdb[tabled_srv.state_tdb], + tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid, + (tabled_srv.mc_delay)? " mc_delay": ""); + for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) { + rp = tmp->data; + applog(LOG_INFO, "PN: name %s dbid %d", rp->name, rp->dbid); + if (rp->host) + applog(LOG_INFO, "PN: host %s port %d", + rp->host, rp->port); + if (rp == tabled_srv.rep_master) + applog(LOG_INFO, "PN (master)"); + } applog(LOG_INFO, "STATS: poll %lu event %lu tcp_accept %lu opt_write %lu", tabled_srv.stats.poll, @@ -403,11 +424,17 @@ static void stats_dump(void) bool stat_status(struct client *cli, GList *content) { + struct db_remote *rp; + GList *tmp; char *str; + int rc; /* * The loadavg is system dependent, we'll figure it out later. * On Linux, applications read from /proc/loadavg. + * + * The listening info duplicates the hostname until we split + * the replication identifier from hostname. */ if (asprintf(&str, "<h1>Status</h1>" @@ -415,11 +442,50 @@ bool stat_status(struct client *cli, GList *content) tabled_srv.ourhost, tabled_srv.port) < 0) return false; content = g_list_append(content, str); + if (asprintf(&str, - "<p>State: TDB %s</p>\r\n", - state_name_tdb[tabled_srv.state_tdb]) < 0) + "<p>TDB: group %s " + "state %s host %s rep_port %d dbid %d%s</p>\r\n", + tabled_srv.group, state_name_tdb[tabled_srv.state_tdb], + tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid, + (tabled_srv.mc_delay)? " mc_delay": "") < 0) return false; content = g_list_append(content, str); + + if (tabled_srv.rep_remotes) { + if (asprintf(&str, "<p>") < 0) + return false; + content = g_list_append(content, str); + for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) { + rp = tmp->data; + rc = asprintf(&str, "Peer: name %s dbid %d", + rp->name, rp->dbid); + if (rc < 0) + return false; + content = g_list_append(content, str); + if (rp->host) { + rc = asprintf(&str, " host %s port %d", + rp->host, rp->port); + if (rc < 0) + return false; + content = g_list_append(content, str); + } + if (rp == tabled_srv.rep_master) { + str = strdup(" (master)"); + if (!str) + return false; + content = g_list_append(content, str); + } + rc = asprintf(&str, "<br />\r\n"); + if (rc < 0) + return false; + content = g_list_append(content, str); + } + if (asprintf(&str, "</p>\r\n") < 0) + return false; + content = g_list_append(content, str); + } + if (asprintf(&str, "<p>Stats: " "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n" @@ -1421,7 +1487,7 @@ static void add_chkpt_timer(void) static void tdb_checkpoint(int fd, short events, void *userdata) { - DB_ENV *dbenv = tdb.env; + DB_ENV *dbenv = tdbrep.tdb.env; int rc; if (debugging) @@ -1436,29 +1502,50 @@ static void tdb_checkpoint(int fd, short events, void *userdata) add_chkpt_timer(); } +static void add_reup_timer(void) +{ + static const struct timeval tv = { TABLED_REUP_SEC, 0 }; + + if (evtimer_add(&tabled_srv.reup_timer, &tv) < 0) + applog(LOG_WARNING, "unable to add reup timer"); +} + +static void tdb_reup(int fd, short events, void *userdata) +{ + + if (tabled_srv.state_want == ST_W_MASTER && + tabled_srv.state_tdb == ST_TDB_MASTER) { + /* + * An upgrade failed, retry. + */ + if (rtdb_restart(&tdbrep, true)) { + applog(LOG_WARNING, "Cannot restart to master"); + add_reup_timer(); + } + } +} + static void tdb_state_cb(enum db_event event) { unsigned char cmd; switch (event) { case TDB_EV_ELECTED: - /* - * Safe to stop ignoring bogus client indication, - * so unmute us by advancing the state. - */ - if (tabled_srv.state_tdb == ST_TDB_OPEN) - tabled_srv.state_tdb = ST_TDB_ACTIVE; + /* Just ignore this, we only care for the end state. */ break; case TDB_EV_CLIENT: + /* P3 */ applog(LOG_INFO, "TDB event: slave, state %s", state_name_tdb[tabled_srv.state_tdb]); + goto overmsg; case TDB_EV_MASTER: + /* P3 */ applog(LOG_INFO, "TDB event: master, state %s", state_name_tdb[tabled_srv.state_tdb]); + overmsg: /* * This callback runs on the context of the replication * manager thread, and calling any of our functions thus * turns our program into a multi-threaded one. Instead * we signal the main thread to do the processing. */ - if (tabled_srv.state_tdb != ST_TDB_INIT && - tabled_srv.state_tdb != ST_TDB_OPEN) { + if (tabled_srv.state_tdb != ST_TDB_INIT) { if (event == TDB_EV_MASTER) cmd = TT_CMD_TDBST_MASTER; else @@ -1472,6 +1559,55 @@ static void tdb_state_cb(enum db_event event) } } +void cld_update_cb(void) +{ + switch (tabled_srv.state_want) { + case ST_W_MASTER: + if (tabled_srv.state_tdb == ST_TDB_MASTER) { + ; /* CLD caught up to DB, better late than never */ + } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) { + /* CLD tells us to upgrade, do it */ + if (rtdb_restart(&tdbrep, true)) { + applog(LOG_WARNING, + "Unable to restart to master"); + /* + * Don't try rtdb_fini here, will end in a hang. + * Instead, retry endlessly until it succeeds. + */ + add_reup_timer(); + } + } else { + applog(LOG_WARNING, "Want Master while in state %s", + state_name_tdb[tabled_srv.state_tdb]); + } + break; + case ST_W_SLAVE: + if (tabled_srv.state_tdb == ST_TDB_SLAVE) { + ; /* all good */ + } else if (tabled_srv.state_tdb == ST_TDB_MASTER) { + /* + * OK, this is bad. We lost our CLD session and some + * other node went master on us. Even if we downgrade + * the database now, some clients may have done some + * operations while CLD was bouncing. Complain loudly. + */ + applog(LOG_WARNING, + "Downgrading the database," + " data loss is possible"); + if (rtdb_restart(&tdbrep, false)) { + tabled_srv.state_tdb = ST_TDB_INIT; + rtdb_fini(&tdbrep); + } + } else { + applog(LOG_WARNING, "Want Slave while in state %s", + state_name_tdb[tabled_srv.state_tdb]); + } + break; + default: + ; + } +} + /* * Due to the way storage_node management is tightly woven into the * server, the management of nodes is not in storage.c, which deals @@ -1485,7 +1621,6 @@ int stor_update_cb(void) { int num_up; struct storage_node *stn; - unsigned int env_flags; if (debugging) applog(LOG_DEBUG, "Know of potential %d storage node(s)", @@ -1518,15 +1653,13 @@ int stor_update_cb(void) * We initiate operations even if there's no redundancy in order * to permit bootstrapping and build-time self-checking. */ +/* P3 */ applog(LOG_INFO, "storage updated, TDB state %s", state_name_tdb[tabled_srv.state_tdb]); if (tabled_srv.state_tdb == ST_TDB_INIT) { tabled_srv.state_tdb = ST_TDB_OPEN; - - env_flags = DB_RECOVER | DB_CREATE | DB_THREAD; - if (tdb_init(&tdb, tabled_srv.tdb_dir, NULL, - env_flags, "tabled", true, - tabled_srv.rep_remotes, - tabled_srv.ourhost, tabled_srv.rep_port, - tdb_state_cb)) { + if (rtdb_start(&tdbrep, tabled_srv.tdb_dir, + tabled_srv.state_want == ST_W_MASTER, + tabled_srv.rep_master, + tabled_srv.rep_port, tdb_state_cb)) { tabled_srv.state_tdb = ST_TDB_INIT; applog(LOG_ERR, "Failed to open TDB, limping"); } @@ -1535,10 +1668,122 @@ int stor_update_cb(void) * FIXME This is where we should process redundancy decreases. */ ; + } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) { + if (tabled_srv.state_want == ST_W_MASTER) { + if (rtdb_restart(&tdbrep, true)) { + applog(LOG_WARNING, + "Failed to restart to master"); + add_reup_timer(); + } + } } return num_up; } +int tdb_slave_login_cb(int srcid) +{ + struct db_remote *master; + + master = tabled_srv.rep_master; + if (!master) { + applog(LOG_INFO, "No master at login"); + return -1; + } + if (master->dbid == 0) { + applog(LOG_INFO, "Master dbid %d", srcid); + } else { + if (master->dbid != srcid) { + /* + * This is probably a bad news. Perhaps master rebooted + * on the other side of the network partition and yet + * somehow won a lock in CLD, or something even weirder. + * But we don't know. + */ + applog(LOG_INFO, + "Master switch from dbid %d to dbid %d", + master->dbid, srcid); + } + } + master->dbid = srcid; + + if (tabled_srv.state_tdb == ST_TDB_OPEN) { + applog(LOG_INFO, "Established link, master %s dbid %d", + master->name, master->dbid); + if (tabled_srv.state_want != ST_W_SLAVE) { + applog(LOG_ERR, "Unexpected TDB state %s, limping", + state_name_tdb[tabled_srv.state_tdb]); + rtdb_fini(&tdbrep); + tabled_srv.state_tdb = ST_TDB_INIT; + return -1; + } + if (rtdb_start(&tdbrep, tabled_srv.tdb_dir, + false, + master, + tabled_srv.rep_port, tdb_state_cb)) { + tabled_srv.state_tdb = ST_TDB_INIT; + applog(LOG_ERR, "Failed to open TDB, limping"); + return -1; + } + } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) { + applog(LOG_INFO, "Recovered master connection"); + } else { + applog(LOG_INFO, "Confused about connections"); + } + return 0; +} + +void tdb_slave_disc_cb(void) +{ + static const struct timeval tv = { TABLED_MCWAIT_SEC, 0 }; + + if (tabled_srv.mc_delay) + return; + evtimer_add(&tabled_srv.mc_timer, &tv); + tabled_srv.mc_delay = true; +} + +static void tdb_mc_delay(int fd, short events, void *userdata) +{ + static const unsigned char cmd = TT_CMD_MASTER_LINK_RESET; + + tabled_srv.mc_delay = false; + write(tabled_srv.ev_pipe[1], &cmd, 1); +} + +void tdb_conn_scrub_cb(void) +{ + unsigned char cmd; + + cmd = TT_CMD_LINK_SCRUB; + write(tabled_srv.ev_pipe[1], &cmd, 1); +} + +struct db_remote *tdb_find_remote_byname(const char *name) +{ + struct db_remote *rp; + GList *tmp; + + for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) { + rp = tmp->data; + if (strcmp(rp->name, name) == 0) + return rp; + } + return NULL; +} + +struct db_remote *tdb_find_remote_byid(int id) +{ + struct db_remote *rp; + GList *tmp; + + for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) { + rp = tmp->data; + if (rp->dbid == id) + return rp; + } + return NULL; +} + static int net_open_socket(int addr_fam, int sock_type, int sock_prot, int addr_len, void *addr_ptr, bool is_status) { @@ -1833,26 +2078,66 @@ static void compile_patterns(void) } } -static void tdb_state_process(enum st_tdb new_state) +static void tdb_startup(void) { unsigned int db_flags; - if (debugging) - applog(LOG_DEBUG, "TDB state > %s", state_name_tdb[new_state]); - if ((new_state == ST_TDB_MASTER || new_state == ST_TDB_SLAVE) && - tabled_srv.state_tdb == ST_TDB_ACTIVE) { + db_flags = DB_CREATE | DB_THREAD; + if (tdb_up(&tdbrep.tdb, db_flags)) + return; + if (objid_init(&tabled_srv.object_count, &tdbrep.tdb)) { + tdb_down(&tdbrep.tdb); + return; + } + add_chkpt_timer(); + rep_start(); + net_listen_client(); +} - db_flags = DB_CREATE | DB_THREAD; - if (tdb_up(&tdb, db_flags)) - return; +static void tdb_state_process(enum st_tdb new_state) +{ - if (objid_init(&tabled_srv.object_count, &tdb)) { - tdb_down(&tdb); - return; + applog(LOG_INFO, "TDB state %s > %s", + state_name_tdb[tabled_srv.state_tdb], state_name_tdb[new_state]); + + if (tabled_srv.state_tdb == ST_TDB_OPEN) { + if (new_state == ST_TDB_MASTER) { + if (tabled_srv.state_want == ST_W_MASTER) { + tdb_startup(); + } else { + /* + * We want slave if we cannot connect to CLD, + * or we cannot lock the master file, which + * means that other master may exist. + * But the db goes master on us, so + * either the other master is dead or we're + * misconfigured so DBs cannot talk. + * Either way, we should poke db until the + * desired result is accomplished. XXX + */ + applog(LOG_INFO, "TDB went Master on us"); + } + } else if (new_state == ST_TDB_SLAVE) { + applog(LOG_INFO, "TDB went Slave, so whatever"); + ; + } else { + applog(LOG_ERR, "TDB went to unexpected state"); + } + } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) { + if (new_state == ST_TDB_MASTER) { + if (tabled_srv.state_want == ST_W_MASTER) { + tdb_startup(); + } else { + /* + * This is either a net split or CLD is doing + * its timeouts and so we do not want to be + * a master yet. + */ + applog(LOG_ERR, "TDB upgraded on us"); + } + } else { + applog(LOG_ERR, "TDB is confused"); } - add_chkpt_timer(); - rep_start(); - net_listen_client(); } } @@ -1871,6 +2156,11 @@ static void internal_event(int fd, short events, void *userdata) abort(); } + if (debugging) { + applog(LOG_DEBUG, "Context Event %s, TDB state %s", + cmd_name_tdb[cmd], state_name_tdb[tabled_srv.state_tdb]); + } + switch (cmd) { case TT_CMD_DUMP: stats_dump(); @@ -1890,6 +2180,15 @@ static void internal_event(int fd, short events, void *userdata) } break; + case TT_CMD_MASTER_LINK_RESET: + rtdb_mc_reset(&tdbrep, tabled_srv.state_want == ST_W_MASTER, + tabled_srv.rep_master, tabled_srv.rep_port); + break; + + case TT_CMD_LINK_SCRUB: + rtdb_dbc_scrub(&tdbrep); + break; + default: applog(LOG_WARNING, "%s BUG: command 0x%x", __func__, cmd); break; @@ -1905,6 +2204,7 @@ int main (int argc, char *argv[]) INIT_LIST_HEAD(&tabled_srv.all_stor); INIT_LIST_HEAD(&tabled_srv.write_compl_q); tabled_srv.state_tdb = ST_TDB_INIT; + tabled_srv.rep_next_id = DBID_MIN; /* isspace() and strcasecmp() consistency requires this */ setlocale(LC_ALL, "C"); @@ -1978,6 +2278,8 @@ int main (int argc, char *argv[]) tabled_srv.evbase_main = event_init(); event_base_rep = event_base_new(); evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL); + evtimer_set(&tabled_srv.mc_timer, tdb_mc_delay, NULL); + evtimer_set(&tabled_srv.reup_timer, tdb_reup, NULL); /* set up internal communication pipe */ if (pipe(tabled_srv.ev_pipe) < 0) { @@ -1991,6 +2293,13 @@ int main (int argc, char *argv[]) goto err_pevt; } + /* late-construct structures with allocations */ + if (rtdb_init(&tdbrep, tabled_srv.ourhost)) { + applog(LOG_WARNING, "rtdb_init"); + rc = 1; + goto err_rtdb; + } + /* set up server networking */ if (tabled_srv.status_port) { if (net_open_known(tabled_srv.status_port, true) == 0) @@ -2000,7 +2309,8 @@ int main (int argc, char *argv[]) if (rc) goto err_out_net; - if (cld_begin(tabled_srv.ourhost, tabled_srv.group, verbose) != 0) { + if (cld_begin(tabled_srv.ourhost, tabled_srv.group, + tabled_srv.rep_name, verbose) != 0) { rc = 1; goto err_cld_session; } @@ -2023,13 +2333,13 @@ err_cld_session: err_out_net: if (tabled_srv.state_tdb == ST_TDB_MASTER || tabled_srv.state_tdb == ST_TDB_SLAVE) { - tdb_down(&tdb); - tdb_fini(&tdb); - } else if (tabled_srv.state_tdb == ST_TDB_OPEN || - tabled_srv.state_tdb == ST_TDB_ACTIVE) { - tdb_fini(&tdb); + tdb_down(&tdbrep.tdb); + rtdb_fini(&tdbrep); + } else if (tabled_srv.state_tdb == ST_TDB_OPEN) { + rtdb_fini(&tdbrep); } -/* err_tdb_init: */ +err_rtdb: + event_del(&tabled_srv.pevt); err_pevt: close(tabled_srv.ev_pipe[0]); close(tabled_srv.ev_pipe[1]); diff --git a/server/tabled.h b/server/tabled.h index ff419e3..c90511c 100644 --- a/server/tabled.h +++ b/server/tabled.h @@ -45,6 +45,8 @@ enum { TABLED_CHKPT_SEC = 60 * 5, /* secs between db4 chkpt */ TABLED_RESCAN_SEC = 60*3 + 7, /* secs btw key rescans */ + TABLED_MCWAIT_SEC = 35, /* secs to moderate reconn. */ + TABLED_REUP_SEC = 35, /* secs to retry rtdb_restart */ CHUNK_REBOOT_TIME = 3*60, /* secs to declare chunk dead */ @@ -200,8 +202,12 @@ struct client { char req_buf[CLI_REQ_BUF_SZ]; /* input buffer */ }; +enum st_want { + ST_W_INIT, ST_W_MASTER, ST_W_SLAVE +}; + enum st_tdb { - ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_ACTIVE, ST_TDB_MASTER, ST_TDB_SLAVE, + ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_MASTER, ST_TDB_SLAVE, ST_TDBNUM }; @@ -218,6 +224,17 @@ struct server_stats { unsigned long max_write_buf; }; +#define DBID_NONE 0 +#define DBID_MIN 2 +#define DBID_MAX 105 + +struct db_remote { /* other DB nodes */ + char *name; /* do not resolve as a host */ + char *host; + unsigned short port; + int dbid; /* signed in db4, traditional */ +}; + struct listen_cfg { /* bool encrypt; */ /* char *host; */ @@ -233,6 +250,8 @@ struct server { int ev_pipe[2]; struct event pevt; struct list_head write_compl_q; /* list of done writes */ + bool mc_delay; + struct event mc_timer; char *config; /* config file (static) */ @@ -242,6 +261,7 @@ struct server { char *port_file; char *chunk_user; /* username for stc_new */ char *chunk_key; /* key for stc_new */ + char *rep_name; /* db4 replication name */ unsigned short rep_port; /* db4 replication port */ char *status_port; /* status webserver */ char *group; /* our group (both T and Ch) */ @@ -249,12 +269,16 @@ struct server { char *ourhost; /* use this if DB master */ struct database *db; /* database handle */ GList *rep_remotes; + struct db_remote *rep_master; /* if we're slave */ + int rep_next_id; + struct event reup_timer; GList *sockets; struct list_head all_stor; /* struct storage_node */ int num_stor; /* number of storage_node's */ uint64_t object_count; + enum st_want state_want; enum st_tdb state_tdb; enum st_net state_net; @@ -263,7 +287,55 @@ struct server { struct server_stats stats; /* global statistics */ }; -extern struct tabledb tdb; +/* + * Low-level channel, for both sides. + * + * The combined link state confuses session (e.g. login) and the framing, which + * is not pretty but works. At least we have a separate link-state struct. + * + * In a settled state, db_conn corresponds 1:1 to db_remote, but + * it's not necesserily so when connections are being established. + */ +enum dbc_state { DBC_INIT, DBC_LOGIN, DBC_OPEN, DBC_DEAD }; + +struct db_link { + int fd; + enum dbc_state state; + + bool writing; + struct event wrev; /* when writing */ + unsigned char *obuf; + int obuflen; + int done, togo; + + struct event rcev; /* whenever fd >= 0 */ + unsigned char *ibuf; + int ibuflen; /* currently allocated ibuf */ + int cnt; /* currently in ibuf */ + int explen; /* expected length */ +}; + +struct db_conn { /* a connection with other DB node */ + struct tablerep *rtdb; + struct db_remote *remote; + struct list_head link; + + struct db_link lk; +}; + +struct tablerep { + struct tabledb tdb; + const char *thisname; + int thisid; + + int sockfd4, sockfd6; + struct event lsev4, lsev6; + struct list_head conns; // struct db_conn + + struct db_conn *mdbc; +}; + +extern struct tablerep tdbrep; /* bucket.c */ extern bool has_access(const char *user, const char *bucket, const char *key, @@ -295,7 +367,8 @@ extern void cli_in_end(struct client *cli); /* cldu.c */ extern void cld_init(void); -extern int cld_begin(const char *fqdn, const char *group, int verbose); +extern int cld_begin(const char *fqdn, const char *group, const char *name, + int verbose); extern void cldu_add_host(const char *host, unsigned int port); extern void cld_end(void); @@ -332,7 +405,13 @@ extern bool cli_write_start(struct client *cli); extern bool cli_write_run_compl(void); extern int cli_req_avail(struct client *cli); extern void applog(int prio, const char *fmt, ...); +extern void cld_update_cb(void); extern int stor_update_cb(void); +extern int tdb_slave_login_cb(int srcid); +extern void tdb_slave_disc_cb(void); +extern void tdb_conn_scrub_cb(void); +extern struct db_remote *tdb_find_remote_byname(const char *name); +extern struct db_remote *tdb_find_remote_byid(int id); /* status.c */ extern bool stat_evt_http_req(struct client *cli, unsigned int events); @@ -374,4 +453,16 @@ extern void rep_start(void); extern void rep_stats(void); extern bool rep_status(struct client *cli, GList *content); +/* metarep.c */ +extern int rtdb_init(struct tablerep *rtdb, const char *thishost); +extern int rtdb_start(struct tablerep *rtdb, const char *db_home, + bool we_are_master, + struct db_remote *rep_master, unsigned short rep_port, + void (*cb)(enum db_event)); +extern void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master, + struct db_remote *rep_master, unsigned short rep_port); +extern void rtdb_dbc_scrub(struct tablerep *rtdb); +extern int rtdb_restart(struct tablerep *rtdb, bool we_are_master); +extern void rtdb_fini(struct tablerep *rtdb); + #endif /* __TABLED_H__ */ diff --git a/server/tdbadm.c b/server/tdbadm.c index 86fa4b3..4bd26cc 100644 --- a/server/tdbadm.c +++ b/server/tdbadm.c @@ -45,11 +45,10 @@ enum various_modes { static int mode_adm; static unsigned long invalid_lines; static char *tdb_dir; -static unsigned short rep_port; static char *config = "/etc/tabled.conf"; -static char *ourhost; static struct tabledb tdb; +static bool tdb_is_master; const char *argp_program_version = PACKAGE_VERSION; @@ -110,7 +109,6 @@ static void cfg_elm_end(GMarkupParseContext *context, { struct config_context *cc = user_data; struct stat statb; - int n; if (!strcmp(element_name, "TDB") && cc->text) { if (!tdb_dir) { @@ -134,25 +132,6 @@ static void cfg_elm_end(GMarkupParseContext *context, cc->text = NULL; } - else if (!strcmp(element_name, "ForceHost") && cc->text) { - free(ourhost); - ourhost = cc->text; - cc->text = NULL; - } - - else if (!strcmp(element_name, "TDBRepPort") && cc->text) { - n = strtol(cc->text, NULL, 10); - if (n <= 0 || n >= 65536) { - fprintf(stderr, "warning: " - "TDBRepPort '%s' invalid, ignoring", cc->text); - free(cc->text); - cc->text = NULL; - return; - } - rep_port = n; - free(cc->text); - cc->text = NULL; - } } static bool str_n_isspace(const char *s, size_t n) @@ -198,8 +177,6 @@ static void read_config(void) memset(&ctx, 0, sizeof(struct config_context)); - rep_port = 8083; - if (!g_file_get_contents(config, &text, &len, NULL)) { fprintf(stderr, "failed to read config file %s\n", config); exit(1); @@ -603,10 +580,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) return 0; } +static void tdb_state_cb(enum db_event event) +{ + if (event == TDB_EV_MASTER) + tdb_is_master = true; +} + int main(int argc, char *argv[]) { - char hostname[64]; - unsigned int env_flags, db_flags; + unsigned int db_flags; error_t aprc; int rc = 1; @@ -621,21 +603,12 @@ int main(int argc, char *argv[]) if (!tdb_dir) die("no tdb dir (-t) specified\n"); - if (ourhost) - strcpy(hostname, ourhost); - else if (gethostname(hostname, sizeof(hostname)) < 0) { - fprintf(stderr, "gethostname failed: %s\n", strerror(errno)); - return 1; - } - - env_flags = DB_RECOVER | DB_CREATE | DB_THREAD; - if (tdb_init(&tdb, tdb_dir, NULL, env_flags, - "tdbadm", false, NULL, hostname, rep_port, NULL)) + if (tdb_init(&tdb, tdb_dir, NULL, "tdbadm", false, + 0, NULL, true, tdb_state_cb)) goto err_dbinit; - /* Usually takes about 12s */ - /* FIXME don't peek into private parts of tdb struct, use state_cb */ - while (!tdb.is_master) + /* Usually takes about 12s, if vote is involved. */ + while (!tdb_is_master) sleep(2); db_flags = DB_CREATE | DB_THREAD; -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html