Re: [tabled patch 3/3] Fix metadata replication

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, 10 Aug 2010 15:14:19 -0400
Jeff Garzik <jeff@xxxxxxxxxx> wrote:

> On 08/05/2010 11:40 PM, Pete Zaitcev wrote:

> > The metadata replication in tabled nominally existed, but did not
> > worked. There were a couple of small bugs (such as an attempt to
> > boot directly into Slave state would lead to a hang). However, the
> > biggest problem was how the identity of nodes in Replication Manager
> > API had to be the same as hostname. When doing so, repmgr code
> > used the hostname to bind instead of a wildcard socket. But when
> > doing so, on any stock Fedora or RHEL system it would end listening
> > on loopback only, because the /etc/hosts aliased the hostname to
> > loopback address. Thus running any replication required addition
> > host configuration that can cause any kind of unexpected consequences.
> > In addition it's impossible to run two nodes on one host for testing.
> >
> > This patch does away with the Replication Manager and uses Base API
> > instead. This way, issues with host aliasing are addressed, and
> > the state transitions occur much faster because there is no voting.
> >
> > Note that the provision is added to run peers on the same host,
> > using a configuration clause TDBRepName. I was unable to come up with
> > a reliable way to make persistent, nonconflicting identifiers that
> > would replace hostnames. Fortunately, this should only be used
> > for build tests, where we probably can live with it.
> >
> > The resulting replication feature was tested to work. Not sure if
> > it is enough to trust it with one's data, but it's better than before.
> >
> > Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx>
> >
> > ---
> >   doc/etc.tabled.conf |    8
> >   doc/setup.txt       |   13 +
> >   include/tdb.h       |   15 -
> >   lib/tdb.c           |  130 ++++++-------
> >   server/Makefile.am  |    2
> >   server/bucket.c     |   34 +--
> >   server/cldu.c       |  416 ++++++++++++++++++++++++++++++++++++------
> >   server/config.c     |   10 +
> >   server/object.c     |   22 +-
> >   server/replica.c    |    8
> >   server/server.c     |  404 ++++++++++++++++++++++++++++++++++++----
> >   server/tabled.h     |   97 +++++++++
> >   server/tdbadm.c     |   51 +----
> >   13 files changed, 963 insertions(+), 247 deletions(-)
> 
> Including metarep.c would be helpful ;-)
> 
> Will wait on release for this...

Sorry!

Unfortunately this is getting a little behind, because I started
working on the tests and they require some changes (e.g. the listening
host and port may be unknown at the time MASTER file is locked).

---
 doc/etc.tabled.conf |    8 
 doc/setup.txt       |   13 
 include/tdb.h       |   15 
 lib/tdb.c           |  130 ++--
 server/Makefile.am  |    2 
 server/bucket.c     |   34 -
 server/cldu.c       |  416 ++++++++++++--
 server/config.c     |   10 
 server/metarep.c    | 1245 ++++++++++++++++++++++++++++++++++++++++++
 server/object.c     |   22 
 server/replica.c    |    8 
 server/server.c     |  404 ++++++++++++-
 server/tabled.h     |   97 +++
 server/tdbadm.c     |   51 -
 14 files changed, 2208 insertions(+), 247 deletions(-)

diff --git a/doc/etc.tabled.conf b/doc/etc.tabled.conf
index 22d20a7..c3b1d1d 100644
--- a/doc/etc.tabled.conf
+++ b/doc/etc.tabled.conf
@@ -13,12 +13,12 @@
 
 <!--
   One group per DB, don't skimp on groups. Also, make sure the replication
-  ports do not conflict when you make hosts to host several groups.
-  Unfortunately, the diagnostics are not very good if they do.
-  Most likely you'll see database corruption in such cases.
+  ports do not conflict when you make boxes to host several groups or use
+  replication instances iwth TDBRepName.
   -->
 <Group>ultracart2</Group>
-<TDB>/path/tabled/tdb</TDB>
+<TDB>/path/tabled-uc2/</TDB>        <!-- mkdir -p /path/tabled-uc2 -->
+<!-- <TDBRepName>12345.my_local_node_name.example.com</TDBRepName> -->
 <TDBRepPort>8083</TDBRepPort>
 
 <!--
diff --git a/doc/setup.txt b/doc/setup.txt
index ac0dfb0..c7a4c6a 100644
--- a/doc/setup.txt
+++ b/doc/setup.txt
@@ -15,7 +15,9 @@ _cld._udp.phx2.ex.com has SRV record 10 50 8081 maika.phx2.ex.com.
    Also, make sure that your hostname has a domain. We don't want to search
    for CLD in the world-wide DNS root, do we?
 
-   Make sure CLD is up (run "cldcli" to verify).
+   Once you know that CLD is running, verify that tabled can talk to
+   it by running "cldcli". UDP traffic to be allowed for port 8081 or
+   other port as specified in the SRV record.
 
 *) Another thing to set up in DNS is a wildcard host for the system where
    tabled will run. Unlike the SRV records of CLD, this is optional, but
@@ -30,6 +32,10 @@ emus3           IN      A       192.168.128.9
    All examples on Google say FQDN is required, and most presume aliasing
    of A and AAAA records, but BIND 9 eats the above fine.
 
+*) Speaking of FQDN, it is possible to force tabled to use a non-default
+   hostname with ForceHost tag. In practice this is only useful when
+   the DNS is broken.
+
 *) Copy configuration file from doc/etc.tabled.conf to /etc/tabled.conf
    and edit to suit (see configurable items below). Notice that the file
    looks like XML, but is not really. In particular, names of elements are
@@ -53,6 +59,11 @@ emus3           IN      A       192.168.128.9
    Group name defaults to "default", so you can leave this element unset,
    but don't do it. Any name, even "qwerty", is better than the default.
 
+*) In each group, tabled uses its hostname to identify itself. However,
+   if you ever wish to run two tabled processes that serve the same group,
+   it can be accomplished by setting TDBRepName. N.B.: A loss of power for
+   the host will knock out all of them, so never use this in production.
+
 *) Select the port to listen, if desired. This is done using the <Listen>
    element:
 
diff --git a/include/tdb.h b/include/tdb.h
index 8895704..ff3b4b5 100644
--- a/include/tdb.h
+++ b/include/tdb.h
@@ -109,15 +109,12 @@ struct tabledb {
 	DB		*oids;			/* object ID db */
 };
 
-struct db_remote {	/* remotes for tdb_init */
-	char *host;
-	unsigned short port;
-};
-
-extern int tdb_init(struct tabledb *tdb, const char *home, const char *pass,
-	unsigned int env_flags, const char *errpfx, bool do_syslog,
-	GList *remotes, char *rep_host, unsigned short rep_port,
-	void (*cb)(enum db_event));
+extern int tdb_init(struct tabledb *tdb, const char *db_home,
+	const char *db_password, const char *errpfx, bool do_syslog,
+	int rep_our_id,
+	int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+			const DB_LSN *lsnp, int envid, uint32_t flags),
+	bool we_are_master, void (*cb)(enum db_event));
 extern int tdb_up(struct tabledb *tdb, unsigned int open_flags);
 extern void tdb_down(struct tabledb *tdb);
 extern void tdb_fini(struct tabledb *tdb);
diff --git a/lib/tdb.c b/lib/tdb.c
index bc5e50a..29a18f0 100644
--- a/lib/tdb.c
+++ b/lib/tdb.c
@@ -1,6 +1,6 @@
 
 /*
- * Copyright 2008-2009 Red Hat, Inc.
+ * Copyright 2008-2010 Red Hat, Inc.
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -148,35 +148,15 @@ err_out:
 	return -EIO;
 }
 
-static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
-{
-	int rc;
-	struct db_remote *rp;
-	GList *tmp;
-
-	*nsites = 0;
-	for (tmp = remotes; tmp; tmp = tmp->next) {
-		rp = tmp->data;
-
-		rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
-						   NULL, 0);
-		if (rc) {
-			dbenv->err(dbenv, rc,
-				   "dbenv->add.remote.site host %s port %u",
-				   rp->host, rp->port);
-			return rc;
-		}
-		(*nsites)++;
-	}
-
-	return 0;
-}
-
 static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 {
 	struct tabledb *tdb = dbenv->app_private;
 
 	switch (event) {
+	case DB_EVENT_PANIC:
+		dbenv->errx(dbenv, "PANIC event is reported, exiting");
+		exit(2);
+		break;
 	case DB_EVENT_REP_CLIENT:
 		tdb->is_master = false;
 		if (tdb->state_cb)
@@ -191,6 +171,14 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 		if (tdb->state_cb)
 			(*tdb->state_cb)(TDB_EV_ELECTED);
 		break;
+	case DB_EVENT_REP_NEWMASTER:
+		dbenv->errx(dbenv, "New master is reported: %d",
+			    *(int *)event_info);
+		/* XXX Need to verify that it's the same master as before. */
+		break;
+	case DB_EVENT_REP_STARTUPDONE:
+		dbenv->errx(dbenv, "Client start-up complete");
+		break;
 	default:
 		/* do nothing */
 		break;
@@ -202,15 +190,18 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
  * db_password, cb can be NULL
  */
 int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
-	     unsigned int env_flags, const char *errpfx, bool do_syslog,
-	     GList *remotes, char *rep_host, unsigned short rep_port,
+	     const char *errpfx, bool do_syslog, int rep_ourid,
+	     int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+			     const DB_LSN *lsnp, int envid, uint32_t flags),
+	     bool we_are_master,
 	     void (*cb)(enum db_event))
 {
-	int nsites;
+	unsigned int env_flags;
+	unsigned int rep_flags;
 	int rc;
 	DB_ENV *dbenv;
 
-	tdb->is_master = false;
+	tdb->is_master = we_are_master;
 	tdb->home = db_home;
 	tdb->state_cb = cb;
 
@@ -258,12 +249,6 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
 		tdb->keyed = true;
 	}
 
-	rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
-	if (rc) {
-		dbenv->err(dbenv, rc, "repmgr_set_local_site");
-		goto err_out;
-	}
-
 	rc = dbenv->set_event_notify(dbenv, db4_event);
 	if (rc) {
 		dbenv->err(dbenv, rc, "set_event_notify");
@@ -283,42 +268,65 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
 	// 	goto err_out;
 	// }
 
-	rc = dbenv->rep_set_priority(dbenv, 100);
-	if (rc) {
-		dbenv->err(dbenv, rc, "rep_set_priority");
-		goto err_out;
-	}
+	if (rep_send) {
+		rc = dbenv->rep_set_transport(dbenv, rep_ourid, rep_send);
+		if (rc) {
+			dbenv->err(dbenv, rc, "rep_set_transport");
+			goto err_out;
+		}
 
-	/* init DB transactional environment, stored in directory db_home */
-	env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
-	env_flags |= DB_INIT_TXN | DB_INIT_REP;
-	rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
-	if (rc) {
-		dbenv->err(dbenv, rc, "open(dbenv)");
-		goto err_out;
-	}
+		// /*
+		//  * Fix the derbies. This is the only way, since passing of
+		//  * DB_REP_MASTER to rep_start() after a failover will end in:
+		//  * "DB_REP_UNAVAIL: Unable to elect a master" (and a hang).
+		//  */
+		// rc = dbenv->rep_set_priority(dbenv, we_are_master ? 100 : 10);
+		// if (rc) {
+		// 	dbenv->err(dbenv, rc, "rep_set_priority");
+		// 	goto err_out;
+		// }
+
+		env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+		env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+		env_flags |= DB_INIT_TXN | DB_INIT_REP;
+		rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+		if (rc) {
+			dbenv->err(dbenv, rc, "open rep");
+			goto err_out;
+		}
 
-	rc = add_remote_sites(dbenv, remotes, &nsites);
-	if (rc)
-		goto err_out;
+		rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+		rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+		if (rc) {
+			dbenv->err(dbenv, rc, "rep_start");
+			goto err_out;
+		}
 
-	// rc = dbenv->rep_set_nsites(dbenv, nsites + 1);
-	// if (rc) {
-	// 	dbenv->err(dbenv, rc, "rep_set_nsites");
-	// 	goto err_out;
-	// }
+	} else {
+		env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+		env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+		env_flags |= DB_INIT_TXN;
+		rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+		if (rc) {
+			dbenv->err(dbenv, rc, "open norep");
+			goto err_out;
+		}
 
-	rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
-	if (rc) {
-		dbenv->err(dbenv, rc, "repmgr_start");
-		goto err_out;
+		/* XXX rip this out from tdbadm.c */
+		/*
+		 * The db4 only delivers callbacks if replication was ordered.
+		 * Since we force-set master, we ought to deliver them here
+		 * for the universal code to work as if a master was elected.
+		 */
+		if (cb)
+			(*cb)(we_are_master ? TDB_EV_MASTER : TDB_EV_CLIENT);
 	}
 
 	return 0;
 
 err_out:
 	dbenv->close(dbenv, 0);
-	return rc;
+	return -1;
 }
 
 /*
diff --git a/server/Makefile.am b/server/Makefile.am
index 6397245..5b53a0a 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,7 +4,7 @@ INCLUDES	= -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
 sbin_PROGRAMS	= tabled tdbadm
 
 tabled_SOURCES	= tabled.h		\
-		  bucket.c cldu.c config.c object.c replica.c \
+		  bucket.c cldu.c config.c metarep.c object.c replica.c \
 		  server.c status.c storage.c storparse.c util.c
 tabled_LDADD	= ../lib/libtdb.a		\
 		  @HAIL_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
diff --git a/server/bucket.c b/server/bucket.c
index a95d23e..eb03e03 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -43,11 +43,11 @@ bool has_access(const char *user, const char *bucket, const char *key,
 	size_t alloc_len, key_len = 0;
 	struct db_acl_key *acl_key;
 	struct db_acl_ent *acl;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
 	DBT pkey, pval;
 	DBC *cur = NULL;
-	DB *acls = tdb.acls;
+	DB *acls = tdbrep.tdb.acls;
 
 	if (user == NULL)
 		user = DB_ACL_ANON;
@@ -132,7 +132,7 @@ err_out:
 static int add_access_user(DB_TXN *txn, const char *bucket, const char *key,
 			   const char *user, const char *perms)
 {
-	DB *acls = tdb.acls;
+	DB *acls = tdbrep.tdb.acls;
 	int key_len;
 	int acl_len;
 	struct db_acl_ent *acl;
@@ -203,8 +203,8 @@ bool service_list(struct client *cli, const char *user)
 	bool rcb;
 	DB_TXN *txn = NULL;
 	DBC *cur = NULL;
-	DB_ENV *dbenv = tdb.env;
-	DB *bidx = tdb.buckets_idx;
+	DB_ENV *dbenv = tdbrep.tdb.env;
+	DB *bidx = tdbrep.tdb.buckets_idx;
 	DBT skey, pkey, pval;
 
 	if (asprintf(&s,
@@ -348,7 +348,7 @@ bool bucket_valid(const char *bucket)
 static int bucket_find(DB_TXN *txn, const char *bucket, char *owner,
 		       int owner_len)
 {
-	DB *buckets = tdb.buckets;
+	DB *buckets = tdbrep.tdb.buckets;
 	DBT key, val;
 	struct db_bucket_ent ent;
 	int rc;
@@ -455,9 +455,9 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
 	struct db_bucket_ent ent;
 	bool setacl;			/* is ok to put pre-existing bucket */
 	enum ReqACLC canacl;
-	DB *buckets = tdb.buckets;
-	DB *acls = tdb.acls;
-	DB_ENV *dbenv = tdb.env;
+	DB *buckets = tdbrep.tdb.buckets;
+	DB *acls = tdbrep.tdb.acls;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
 	DBT key, val;
 
@@ -589,11 +589,11 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
 	enum errcode err = InternalError;
 	int rc;
 	struct db_bucket_ent ent;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
-	DB *buckets = tdb.buckets;
-	DB *acls = tdb.acls;
-	DB *objs = tdb.objs;
+	DB *buckets = tdbrep.tdb.buckets;
+	DB *acls = tdbrep.tdb.acls;
+	DB *objs = tdbrep.tdb.objs;
 	DBC *cur = NULL;
 	DBT key, val;
 	char structbuf[sizeof(struct db_acl_key) + 32];
@@ -922,9 +922,9 @@ static bool bucket_list_keys(struct client *cli, const char *user,
 	size_t pfx_len;
 	struct bucket_list_info bli;
 	bool rcb;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	DBC *cur = NULL;
 	DBT pkey, pval;
 	struct db_obj_key *obj_key;
@@ -1159,8 +1159,8 @@ bool access_list(struct client *cli, const char *bucket, const char *key,
 
 	GHashTable *param;
 	enum errcode err = InternalError;
-	DB_ENV *dbenv = tdb.env;
-	DB *acls = tdb.acls;
+	DB_ENV *dbenv = tdbrep.tdb.env;
+	DB *acls = tdbrep.tdb.acls;
 	int alloc_len;
 	char owner[64];
 	GList *res;
diff --git a/server/cldu.c b/server/cldu.c
index 5f3631b..45a6a83 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -35,6 +35,8 @@
 
 #define ALIGN8(n)	((8 - ((n) & 7)) & 7)
 
+#define MASTER_FILE	"MASTER"
+
 struct chunk_node {
 	struct list_head link;
 	char name[65];
@@ -63,18 +65,22 @@ struct cld_session {
 	int actx;		/* Active host cldv[actx] */
 	struct cld_host cldv[N_CLD];
 
+	char *thisname;
 	char *thisgroup;
 	char *thishost;
 	char *cfname;		/* /tabled-group directory */
 	struct ncld_fh *cfh;	/* /tabled-group directory, keep open for scan */
-	char *ffname;		/* /tabled-group/thishost */
-	struct ncld_fh *ffh;	/* /tabled-group/thishost, keep open for lock */
+	char *ffname;		/* /tabled-group/thisname */
+	struct ncld_fh *ffh;	/* /tabled-group/thisname, keep open for lock */
+	char *mfname;		/* /tabled-group/MASTER */
+	struct ncld_fh *mfh;	/* /tabled-group/MASTER, keep open for lock */
 	char *xfname;		/* /chunk-GROUP directory */
 
 	struct list_head chunks;	/* found in xfname, struct chunk_node */
 };
 
 static int cldu_set_cldc(struct cld_session *sp, int newactive);
+static int scan_peers(struct cld_session *sp);
 static int scan_chunks(struct cld_session *sp);
 static void next_chunk(struct cld_session *sp, struct chunk_node *np);
 static void add_remote(const char *name);
@@ -113,13 +119,17 @@ static int cldu_nextactive(struct cld_session *sp)
  * chunkservers that it uses, so this function only takes one group argument.
  */
 static int cldu_setgroup(struct cld_session *sp,
-			const char *thisgroup, const char *thishost)
+			 const char *thisgroup, const char *thishost,
+			 const char *thisname)
 {
 	char *mem;
 
 	if (thisgroup == NULL) {
 		thisgroup = "default";
 	}
+	if (thisname == NULL) {
+		thisname = thishost;
+	}
 
 	sp->thisgroup = strdup(thisgroup);
 	if (!sp->thisgroup)
@@ -127,15 +137,22 @@ static int cldu_setgroup(struct cld_session *sp,
 	sp->thishost = strdup(thishost);
 	if (!sp->thishost)
 		goto err_oom;
+	sp->thisname = strdup(thisname);
+	if (!sp->thisname)
+		goto err_oom;
 
 	if (asprintf(&mem, "/tabled-%s", thisgroup) == -1)
 		goto err_oom;
 	sp->cfname = mem;
 
-	if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thishost) == -1)
+	if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thisname) == -1)
 		goto err_oom;
 	sp->ffname = mem;
 
+	if (asprintf(&mem, "/tabled-%s/%s", thisgroup, MASTER_FILE) == -1)
+		goto err_oom;
+	sp->mfname = mem;
+
 	if (asprintf(&mem, "/chunk-%s", thisgroup) == -1)
 		goto err_oom;
 	sp->xfname = mem;
@@ -147,6 +164,259 @@ err_oom:
 	return 0;
 }
 
+/*
+ * Ugh, side effects on tabled_srv.rep_master.
+ */
+static void cldu_parse_master(const char *mfname, const char *mfile, long len)
+{
+	enum lex_state { lex_tag, lex_colon, lex_val };
+	const char *tag, *val;
+	int taglen;
+	const char *name, *host, *port;
+	int namelen, hostlen, portlen;
+	char namebuf[65], hostbuf[65], portbuf[15];
+	long portnum;
+	enum lex_state state;
+	struct db_remote *rp;
+	const char *p;
+	char c;
+
+	name = NULL;
+	namelen = 0;
+	host = NULL;
+	hostlen = 0;
+	port = NULL;
+	portlen = 0;
+
+	p = mfile;
+	tag = p;
+	val = NULL;
+	state = lex_tag;
+	for (;;) {
+		if (p >= mfile+len)
+			break;
+		c = *p++;
+		if (state == lex_tag) {
+			if (c == ':') {
+				val = p;
+				state = lex_colon;
+				taglen = (p-1) - tag;
+			} else if (c == '\n') {
+				if (debugging)
+					applog(LOG_DEBUG,
+					       "%s: No colon", mfname);
+				tag = p;
+				val = NULL;
+				state = lex_tag;
+			}
+		} else if (state == lex_colon) {
+			if (c == ' ') {
+				val = p;
+			} else if (c == '\n') {
+				if (debugging)
+					applog(LOG_DEBUG,
+					       "%s: Empty value", mfname);
+				tag = p;
+				val = NULL;
+				state = lex_tag;
+			} else {
+				state = lex_val;
+			}
+		} else if (state == lex_val) {
+			if (c == '\n') {
+				if (taglen == sizeof("name")-1 &&
+				    memcmp(tag, "name", taglen) == 0) {
+					name = val;
+					namelen = (p-1) - val;
+				} else if (taglen == sizeof("host")-1 &&
+				    memcmp(tag, "host", taglen) == 0) {
+					host = val;
+					hostlen = (p-1) - val;
+				} else if (taglen == sizeof("port")-1 &&
+				    memcmp(tag, "port", taglen) == 0) {
+					port = val;
+					portlen = (p-1) - val;
+				} else {
+					if (debugging)
+						applog(LOG_DEBUG,
+						       "%s: Unknown tag %c[%d]",
+						       mfname, tag[0], taglen);
+				}
+				tag = p;
+				val = NULL;
+				state = lex_tag;
+			}
+		} else {
+			return;
+		}
+	}
+
+	if (!name || !namelen) {
+		if (debugging)
+			applog(LOG_DEBUG, "%s: No name", mfname);
+		return;
+	}
+	if (!host || !hostlen) {
+		if (debugging)
+			applog(LOG_DEBUG, "%s: No host", mfname);
+		return;
+	}
+	if (!port || !portlen) {
+		if (debugging)
+			applog(LOG_DEBUG, "%s: No port", mfname);
+		return;
+	}
+
+	if (namelen >= sizeof(namebuf)) {
+		applog(LOG_ERR, "Long master name");
+		return;
+	}
+	memcpy(namebuf, name, namelen);
+	namebuf[namelen] = 0;
+
+	if (hostlen >= sizeof(hostbuf)) {
+		applog(LOG_ERR, "Long host");
+		return;
+	}
+	memcpy(hostbuf, host, hostlen);
+	hostbuf[hostlen] = 0;
+
+	if (portlen >= sizeof(portbuf)) {
+		applog(LOG_ERR, "Long port");
+		return;
+	}
+	memcpy(portbuf, port, portlen);
+	portbuf[portlen] = 0;
+	portnum = strtol(port, NULL, 10);
+	if (portnum <= 0 || portnum >= 65536) {
+		applog(LOG_ERR, "Bad port %s", portbuf);
+		return;
+	}
+
+	rp = tdb_find_remote_byname(namebuf);
+	if (!rp) {
+		if (debugging)
+			applog(LOG_DEBUG, "%s: Not found master %s",
+			       mfname, namebuf);
+		return;
+	}
+
+	if (debugging)
+		applog(LOG_DEBUG, "Found master %s host %s port %u",
+		       namebuf, hostbuf, portnum);
+
+	rp->host = strdup(hostbuf);
+	rp->port = portnum;
+	if (!rp->host)
+		return;
+	tabled_srv.rep_master = rp;
+}
+
+static void cldu_get_master(const char *mfname, struct ncld_fh *mfh)
+{
+	struct ncld_read *nrp;
+	struct timespec tm;
+	int error;
+
+	nrp = ncld_get(mfh, &error);
+	if (!nrp) {
+		applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+		return;
+	}
+
+	if (nrp->length < 3) {
+		ncld_read_free(nrp);
+
+		/*
+		 * Since master opens, locks, and writes, in that order,
+		 * there's a gap between the lock and write. So, unrace a bit.
+		 */
+		tm.tv_sec = 2;
+		tm.tv_nsec = 0;
+		nanosleep(&tm, NULL);
+
+		nrp = ncld_get(mfh, &error);
+		if (!nrp) {
+			applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+			return;
+		}
+
+		if (nrp->length < 3) {
+			applog(LOG_ERR, "CLD master(%s) is empty", mfname);
+			ncld_read_free(nrp);
+			return;
+		}
+	}
+
+	cldu_parse_master(mfname, nrp->ptr, nrp->length);
+	ncld_read_free(nrp);
+}
+
+/*
+ * Lock the MASTER file, write or read it as needed.
+ * N.B. Only call this if you know that mfh is closed or never open:
+ * right after cldu_set_cldc (disposing of session closes handles),
+ * or when we were slave and so should not kept mfh ...
+ * FIXME this will become more interesting when we keep mfh open in slave
+ * state so we can have outstanding locks for master failover notification.
+ */
+static int cldu_set_master(struct cld_session *sp)
+{
+	char *buf;
+	int len;
+	int error;
+	int rc;
+
+	if (!sp->nsp)
+		return -1;
+
+	/* Maybe drop this later, after notifications work. */
+	if (debugging) {
+		rc = g_list_length(sp->nsp->handles);
+		applog(LOG_DEBUG, "open handles %d", rc);
+	}
+
+	sp->mfh = ncld_open(sp->nsp, sp->mfname,
+			    COM_READ | COM_WRITE | COM_LOCK | COM_CREATE,
+			    &error, 0, NULL, NULL);
+	if (!sp->mfh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->mfname, error);
+		goto err_open;
+	}
+
+	error = ncld_trylock(sp->mfh);
+	if (error) {
+		applog(LOG_INFO, "CLD lock(%s) failed: %d", sp->mfname, error);
+		cldu_get_master(sp->mfname, sp->mfh);
+		goto err_lock;
+	}
+
+	len = asprintf(&buf, "name: %s\nhost: %s\nport: %u\n",
+		       sp->thisname, sp->thishost, tabled_srv.rep_port);
+	if (len < 0) {
+		applog(LOG_ERR, "internal error: no core");
+		goto err_wmem;
+	}
+
+	rc = ncld_write(sp->mfh, buf, len);
+	if (rc) {
+		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->mfname, rc);
+		goto err_write;
+	}
+
+	free(buf);
+	return 0;
+
+err_write:
+	free(buf);
+err_wmem:
+	/* ncld_unlock() - close will unlock */
+err_lock:
+	ncld_close(sp->mfh);
+err_open:
+	return -1;
+}
+
 static void cldu_tm_rescan(int fd, short events, void *userdata)
 {
 	struct cld_session *sp = userdata;
@@ -162,14 +432,37 @@ static void cldu_tm_rescan(int fd, short events, void *userdata)
 			sp->nsp = NULL;
 		}
 		newactive = cldu_nextactive(sp);
-		if (cldu_set_cldc(sp, newactive)) {
-			evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
-			return;
+		if (cldu_set_cldc(sp, newactive))
+			goto out;
+
+		if (cldu_set_master(sp) == 0) {
+			tabled_srv.state_want = ST_W_MASTER;
+		} else {
+			if (debugging)
+				applog(LOG_DEBUG, "Unable to relock %s",
+				       sp->mfname);
+			tabled_srv.state_want = ST_W_SLAVE;
 		}
+		cld_update_cb();
+
 		sp->is_dead = false;
+	} else {
+		if (tabled_srv.state_want == ST_W_SLAVE) {
+			if (cldu_set_master(sp) == 0) {
+				tabled_srv.state_want = ST_W_MASTER;
+			} else {
+				if (debugging)
+					applog(LOG_DEBUG, "Unable to lock %s",
+					       sp->mfname);
+			}
+		}
 	}
 
+	if (scan_peers(sp) != 0)
+		goto out;
 	scan_chunks(sp);
+
+ out:
 	evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
 }
 
@@ -201,12 +494,6 @@ static void cldu_sess_event(void *priv, uint32_t what)
 static int cldu_set_cldc(struct cld_session *sp, int newactive)
 {
 	struct cldc_host *hp;
-	struct ncld_read *nrp;
-	char buf[100];
-	const char *ptr;
-	int dir_len;
-	int total_len, rec_len, name_len;
-	int len;
 	struct timespec tm;
 	int error;
 	int rc;
@@ -261,6 +548,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 
 	/*
 	 * Then, create the membership file for us.
+	 * We lock it in case of two tabled running with same name by mistake.
 	 */
 	sp->ffh = ncld_open(sp->nsp, sp->ffname,
 			    COM_WRITE | COM_LOCK | COM_CREATE,
@@ -285,11 +573,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 		/*
 		 * The usual reason why we get a lock conflict is
 		 * restarting too quickly and hitting the previous lock
-		 * that is going to disappear soon.
-		 *
-		 * FIXME: However, it may also be that a master
-		 * is ok we we should become a slave, e.g. start TDB.
-		 * We do not support multi-node, but we should.
+		 * that is going to disappear soon. Just wait it out.
 		 */
 		tm.tv_sec = 10;
 		tm.tv_nsec = 0;
@@ -299,21 +583,43 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 	/*
 	 * Write the file with our connection parameters.
 	 */
-	len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
-	if (len >= sizeof(buf)) {
-		applog(LOG_ERR, "internal error: overflow for port (%d)", len);
-		goto err_wmem;
-	}
-
-	rc = ncld_write(sp->ffh, buf, len);
+	rc = ncld_write(sp->ffh, "-\n", 2);
 	if (rc) {
 		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
 		goto err_write;
 	}
 
 	/*
-	 * Read the directory.
+	 * Finally, scan cfh to find peers, add with global effects.
 	 */
+	if (scan_peers(sp) != 0)
+		goto err_pscan;
+
+	return 0;
+
+err_pscan:
+err_write:
+err_lock:
+	ncld_close(sp->ffh);	/* session-close closes these, maybe drop */
+err_fopen:
+	ncld_close(sp->cfh);
+err_copen:
+	ncld_sess_close(sp->nsp);
+	sp->nsp = NULL;
+err_nsess:
+err_addr:
+	return -1;
+}
+
+static int scan_peers(struct cld_session *sp)
+{
+	struct ncld_read *nrp;
+	char buf[65];
+	const char *ptr;
+	int dir_len;
+	int total_len, rec_len, name_len;
+	int error;
+
 	nrp = ncld_get(sp->cfh, &error);
 	if (!nrp) {
 		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error);
@@ -336,13 +642,20 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 		else
 			buf[64] = 0;
 
-		if (!strcmp(buf, sp->thishost)) {
+		if (!strcmp(buf, MASTER_FILE)) {
+			; /* ignore special entry */
+		} else if (!strcmp(buf, sp->thisname)) {
 			if (debugging)
 				applog(LOG_DEBUG, " %s (ourselves)", buf);
 		} else {
-			if (debugging)
-				applog(LOG_DEBUG, " %s", buf);
-			add_remote(buf);
+			if (tdb_find_remote_byname(buf)) {
+				if (debugging)
+					applog(LOG_DEBUG, " %s", buf);
+			} else {
+				if (debugging)
+					applog(LOG_DEBUG, " %s (new)", buf);
+				add_remote(buf);
+			}
 		}
 
 		ptr += total_len;
@@ -350,21 +663,9 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 	}
 
 	ncld_read_free(nrp);
-
 	return 0;
 
 err_dread:
-err_write:
-err_wmem:
-err_lock:
-	ncld_close(sp->ffh);	/* session-close closes these, maybe drop */
-err_fopen:
-	ncld_close(sp->cfh);
-err_copen:
-	ncld_sess_close(sp->nsp);
-	sp->nsp = NULL;
-err_nsess:
-err_addr:
 	return -1;
 }
 
@@ -508,9 +809,6 @@ err_mem:
 	return;
 }
 
-/*
- * FIXME need to read port number from the file (port:<space>num).
- */
 static void add_remote(const char *name)
 {
 	struct db_remote *rp;
@@ -518,10 +816,15 @@ static void add_remote(const char *name)
 	rp = malloc(sizeof(struct db_remote));
 	if (!rp)
 		return;
+	memset(rp, 0, sizeof(struct db_remote));
+
+	/*
+	 * Master assigns global IDs now, distributes them in login protocol.
+	 */
+	rp->dbid = DBID_NONE;
 
-	rp->port = 8083;
-	rp->host = strdup(name);
-	if (!rp->host) {
+	rp->name = strdup(name);
+	if (!rp->name) {
 		free(rp);
 		return;
 	}
@@ -564,7 +867,8 @@ void cld_init()
 /*
  * This initiates our sole session with a CLD instance.
  */
-int cld_begin(const char *thishost, const char *thisgroup, int verbose)
+int cld_begin(const char *thishost, const char *thisgroup,
+	      const char *thisname, int verbose)
 {
 	static struct cld_session *sp = &ses;
 	struct timespec tm;
@@ -575,7 +879,7 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
 
 	evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses);
 
-	if (cldu_setgroup(sp, thisgroup, thishost)) {
+	if (cldu_setgroup(sp, thisgroup, thishost, thisname)) {
 		/* Already logged error */
 		goto err_group;
 	}
@@ -626,6 +930,14 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
 		newactive = cldu_nextactive(sp);
 	}
 
+	if (cldu_set_master(sp) == 0) {
+		if (debugging)
+			applog(LOG_DEBUG, "Locked %s", sp->mfname);
+		tabled_srv.state_want = ST_W_MASTER;
+	} else {
+		tabled_srv.state_want = ST_W_SLAVE;
+	}
+
 	retry_cnt = 0;
 	for (;;) {
 		if (!scan_chunks(sp))
@@ -696,8 +1008,12 @@ void cld_end(void)
 	sp->ffname = NULL;
 	free(sp->xfname);
 	sp->xfname = NULL;
+	free(sp->mfname);
+	sp->mfname = NULL;
 	free(sp->thisgroup);
 	sp->thisgroup = NULL;
 	free(sp->thishost);
 	sp->thishost = NULL;
+	free(sp->thisname);
+	sp->thisname = NULL;
 }
diff --git a/server/config.c b/server/config.c
index ff4d876..293a5dd 100644
--- a/server/config.c
+++ b/server/config.c
@@ -224,6 +224,16 @@ static void cfg_elm_end (GMarkupParseContext *context,
 		cc->text = NULL;
 	}
 
+	else if (!strcmp(element_name, "TDBRepName")) {
+		if (!cc->text) {
+			applog(LOG_WARNING, "TDBRepName element empty");
+			return;
+		}
+		free(tabled_srv.rep_name);
+		tabled_srv.rep_name = cc->text;
+		cc->text = NULL;
+	}
+
 	else if (!strcmp(element_name, "StatusPort")) {
 		if (!cc->text) {
 			applog(LOG_WARNING, "StatusPort element empty");
diff --git a/server/metarep.c b/server/metarep.c
new file mode 100644
index 0000000..d3eec49
--- /dev/null
+++ b/server/metarep.c
@@ -0,0 +1,1245 @@
+
+/*
+ * Copyright 2008-2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <sys/types.h>
+#include <sys/ioctl.h>
+#include <stddef.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <glib.h>
+#include <tdb.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include "tabled.h"
+
+/* #define offsetof(type, member)	\
+	(((unsigned char *)&((type *)0)->member) - (unsigned char *)0) */
+
+/*
+ * flags:
+ *   <31:28>  version (currently 1)
+ *   <27:8>   unused
+ *    <7:0>   rep_msg_type
+ */
+enum rep_msg_type {  REP_MSG_NOP, REP_MSG_LOGIN, REP_MSG_LOGOK, REP_MSG_DATA };
+struct rep_msg_hdr {
+	unsigned int	flags;
+	unsigned int	lenctl;
+	unsigned int	lendata;
+	unsigned short	dst, src;
+};
+
+/*
+ * The naming convention is to identify the context in which the function runs.
+ */
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf);
+
+/*
+ * Note that the invalid dbid is zero, not -1.
+ */
+static int make_remote_id(void)
+{
+	int id;
+
+	for (;;) {
+		id = rand() % (DBID_MAX+1 - DBID_MIN) + DBID_MIN;
+		if (!tdb_find_remote_byid(id))
+			return id;
+	}
+}
+
+static int dbl_init(struct db_link *dbl)
+{
+	dbl->fd = -1;
+	dbl->state = DBC_INIT;
+
+	dbl->obuflen = 500;
+	dbl->obuf = malloc(dbl->obuflen);
+	if (!dbl->obuf)
+		return -1;
+
+	dbl->ibuflen = sizeof(struct rep_msg_hdr);
+	dbl->ibuf = malloc(dbl->ibuflen);
+	if (!dbl->ibuf) {
+		free(dbl->obuf);
+		return -1;
+	}
+	dbl->cnt = 0;
+	dbl->explen = 1;
+
+	return 0;
+}
+
+static int dbl_irealloc(struct db_link *dbl, int len)
+{
+	unsigned char *newbuf;
+
+	if (len > dbl->ibuflen) {
+		if (!(newbuf = malloc(len)))
+			return -1;
+		memcpy(newbuf, dbl->ibuf, dbl->ibuflen);
+		free(dbl->ibuf);
+		dbl->ibuf = newbuf;
+		dbl->ibuflen = len;
+	}
+	return 0;
+}
+
+/*
+ * Expect the dbl->explen, return accumulated dbl->cnt.
+ */
+static int dbl_expect(struct db_link *dbl)
+{
+	int rc;
+
+	rc = read(dbl->fd, dbl->ibuf + dbl->cnt, dbl->explen - dbl->cnt);
+	if (rc < 0) {
+		if (errno == EAGAIN)
+			return dbl->cnt;
+		applog(LOG_ERR, "network read: %s", strerror(errno));
+		return -1;
+	}
+	if (rc == 0) {
+		applog(LOG_ERR, "EOF from peer"); /* P3 */
+		return -1;
+	}
+	dbl->cnt += rc;
+	return dbl->cnt;
+}
+
+static int dbl_hdr_validate(struct rep_msg_hdr *hdr, int thisid)
+{
+	unsigned int msgflags;
+	int srcid, dstid;
+
+	msgflags = GUINT32_FROM_BE(hdr->flags);
+	if ((msgflags >> 28) != 1) {
+		applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags);
+		return -1;
+	}
+
+	srcid = GUINT16_TO_BE(hdr->src);
+	dstid = GUINT16_TO_BE(hdr->dst);
+	if (srcid == dstid) {
+		applog(LOG_ERR, "Link: loopback, dbid %d", dstid);
+		return -1;
+	}
+	if (srcid < DBID_MIN || srcid > DBID_MAX) {
+		applog(LOG_ERR, "Link: bad src dbid %d", srcid);
+		return -1;
+	}
+	if (dstid < DBID_MIN || dstid > DBID_MAX) {
+		applog(LOG_ERR, "Link: bad dst dbid %d", dstid);
+		return -1;
+	}
+
+	if (thisid != 0 && dstid != thisid) {
+		applog(LOG_ERR, "Link: misdirected, dst dbid %d our dbid %d",
+		       dstid, thisid);
+		return -1;
+	}
+
+	return 0;
+}
+
+/*
+ * Login message is different in two ways:
+ *  - src and/or dst may be not set
+ *  - lenctl is the dst name, lendata is the src name
+ *    (but contents not available for validation)
+ */
+static int dbl_hdr_validate_login(struct rep_msg_hdr *hdr, int thisid)
+{
+	unsigned int msgflags;
+	unsigned int len;
+
+	msgflags = GUINT32_FROM_BE(hdr->flags);
+	if ((msgflags >> 28) != 1) {
+		applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags);
+		return -1;
+	}
+	if ((msgflags & 0xff) != REP_MSG_LOGIN) {
+		applog(LOG_ERR, "Link: bad login request, flags 0x%08x",
+		       msgflags);
+		return -1;
+	}
+
+#if 0 /* A bad idea as long as names are the persistent identifiers. */
+	int dstid;
+	dstid = GUINT32_FROM_BE(hdr->dst);
+	if (dstid && dstid != thisid) {
+		applog(LOG_ERR, "Link: login to wrong dbid %d", dstid);
+		return -1;
+	}
+#endif
+
+	len = GUINT32_FROM_BE(hdr->lenctl);
+	if (len == 0 || len > 64) {
+		applog(LOG_ERR, "Link: bad login dst len %u", len);
+		return -1;
+	}
+
+	len = GUINT32_FROM_BE(hdr->lendata);
+	if (len == 0 || len > 64) {
+		applog(LOG_ERR, "Link: bad login src len %u", len);
+		return -1;
+	}
+
+	return 0;
+}
+
+static void dbl_fini(struct db_link *dbl)
+{
+	if (dbl->writing) {
+		event_del(&dbl->wrev);
+		dbl->writing = false;
+	}
+	if (dbl->fd >= 0) {
+		event_del(&dbl->rcev);
+		close(dbl->fd);
+	}
+	if (dbl->ibuf)
+		free(dbl->ibuf);
+	if (dbl->obuf)
+		free(dbl->obuf);
+}
+
+static struct db_conn *tdb_find_byid(struct tablerep *rtdb, int id)
+{
+	struct db_conn *dbc;
+
+	list_for_each_entry(dbc, &rtdb->conns, link) {
+		if (dbc->remote && dbc->remote->dbid == id)
+			return dbc;
+	}
+	return NULL;
+}
+
+static struct db_conn *dbc_alloc(struct tablerep *rtdb, struct db_remote *rem)
+{
+	struct db_conn *dbc;
+
+	dbc = malloc(sizeof(*dbc));
+	if (!dbc)
+		goto out_mem;
+	memset(dbc, 0, sizeof(*dbc));
+	dbc->rtdb = rtdb;
+	dbc->remote = rem;
+	if (dbl_init(&dbc->lk))
+		goto out_dbl;
+	return dbc;
+
+ out_dbl:
+	free(dbc);
+ out_mem:
+	return NULL;
+}
+
+static void dbc_free(struct db_conn *dbc)
+{
+	dbl_fini(&dbc->lk);
+	free(dbc);
+}
+
+/*
+ * The dbc->remote is known here, see callers.
+ *
+ * The db4 code assumes that it is all right to block when sending. Of course
+ * in our case that means blocking the whole (single-threaded) server.
+ * It is also all right to drop messages, which is said to hurt performance
+ * in other ways. Still, as long as tabled is single-theaded we have no choice.
+ *
+ * Since we can only send complete messages, and even blocking sockets can
+ * return short writes, we must buffer output. But we do not create any
+ * additional queues beyond what is required for the atomicity.
+ */
+static int tdb_rep_send(struct tablerep *rtdb, struct db_link *dbl,
+			int dstid, const DBT *ctl, const DBT *rec,
+			bool easydrop)
+{
+	unsigned char *p;
+	struct rep_msg_hdr *hdr;
+	unsigned int msgflags;
+	ssize_t len;
+	ssize_t rc;
+
+	if (dbl->togo) {
+		/* Maybe poke the output here? Should not be necessary. */
+		return 1;
+	}
+
+	len = sizeof(struct rep_msg_hdr) + ctl->size + rec->size;
+	if (dbl->obuflen < len) {
+		free(dbl->obuf);
+		dbl->obuflen = 0;
+		dbl->obuf = malloc(len);
+		if (!dbl->obuf) {
+			applog(LOG_WARNING, "No core (%ld)", (long) len);
+			return -1;
+		}
+		dbl->obuflen = len;
+	}
+
+	hdr = (struct rep_msg_hdr *) dbl->obuf;
+	p = dbl->obuf;
+
+	memset(hdr, 0, sizeof(struct rep_msg_hdr));
+	msgflags = (1 << 28) | (REP_MSG_DATA);
+	hdr->flags = GUINT32_TO_BE(msgflags);
+	hdr->dst = GUINT16_TO_BE((unsigned short)dstid);
+	hdr->src = GUINT16_TO_BE((unsigned short)rtdb->thisid);
+	p += sizeof(struct rep_msg_hdr);
+	if (ctl->size) {
+		hdr->lenctl = GUINT32_TO_BE(ctl->size);
+		memcpy(p, ctl->data, ctl->size);
+		p += ctl->size;
+	}
+	if (rec->size) {
+		hdr->lendata = GUINT32_TO_BE(rec->size);
+		memcpy(p, rec->data, rec->size);
+		p += rec->size;
+	}
+
+	dbl->done = 0;
+	dbl->togo = p - dbl->obuf;
+
+	rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+	if (rc < 0) {
+		dbl->done = 0;
+		dbl->togo = 0;
+		applog(LOG_ERR, "socket write error, peer dbid %d: %s",
+		       dstid, strerror(errno));
+		return -1;
+	}
+	if (rc < dbl->togo) {
+		if (!dbl->writing) {
+			if (event_add(&dbl->wrev, NULL))
+				applog(LOG_ERR, "event_add failed (write)");
+			else
+				dbl->writing = true;
+		}
+	}
+	dbl->done += rc;
+	dbl->togo -= rc;
+	return 0;
+}
+
+static int db4_rep_send(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+			const DB_LSN *lsnp, int envid, uint32_t flags)
+{
+	struct tablerep *rtdb;
+	struct db_conn *dbc;
+	int cnt;
+	int rc;
+
+	rtdb = (struct tablerep *)
+		((char *)dbenv->app_private - offsetof(struct tablerep, tdb));
+
+	if (envid == DB_EID_BROADCAST) {
+		cnt = 0;
+		list_for_each_entry(dbc, &rtdb->conns, link) {
+			if (dbc->lk.state == DBC_OPEN) {
+				rc = tdb_rep_send(rtdb, &dbc->lk,
+						  dbc->remote->dbid,
+						  ctl, rec, true);
+				if (!rc)
+					cnt++;
+				if (rc < 0)
+					dbc->lk.state = DBC_DEAD;
+			}
+		}
+		if (!cnt)
+			return DB_REP_UNAVAIL;
+	} else {
+		dbc = tdb_find_byid(rtdb, envid);
+		if (dbc && dbc->lk.state == DBC_OPEN) {
+			rc = tdb_rep_send(rtdb, &dbc->lk,
+					  dbc->remote->dbid, ctl, rec, false);
+			if (rc < 0) {
+				dbc->lk.state = DBC_DEAD;
+				return DB_REP_UNAVAIL;
+			}
+			if (rc)
+				return DB_REP_UNAVAIL;
+		} else {
+			applog(LOG_INFO, "Send: dbid %d not found", envid);
+			return DB_REP_UNAVAIL;
+		}
+	}
+	return 0;
+}
+
+static int rtdb_process(struct db_conn *dbc, unsigned char *msgbuf)
+{
+	struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+	DB_ENV *dbenv = dbc->rtdb->tdb.env;
+	DBT pctl, prec;
+	DB_LSN lsn;
+	struct db_remote *peer;
+	int rc;
+
+	peer = tdb_find_remote_byid(GUINT16_FROM_BE(hdr->src));
+	if (!peer) {
+		applog(LOG_INFO, "Unknown peer dbid %d",
+		       GUINT16_FROM_BE(hdr->src));
+		return -1;
+	}
+
+	memset(&pctl, 0, sizeof(pctl));
+	pctl.data = msgbuf + sizeof(struct rep_msg_hdr);
+	pctl.size = GUINT32_FROM_BE(hdr->lenctl);
+	memset(&prec, 0, sizeof(prec));
+	prec.data = pctl.data + pctl.size;
+	prec.size = GUINT32_FROM_BE(hdr->lendata);
+	rc = dbenv->rep_process_message(dbenv, &pctl, &prec, peer->dbid, &lsn);
+	switch (rc) {
+	case DB_REP_ISPERM:
+		/*
+		 * The "record is written" is normal in db4 operations,
+		 * and shows up so much that we do not print it even under
+		 * if (debugging).
+		 */
+		break;
+	case DB_REP_DUPMASTER:		/* DB thinks we have 2 */
+	case DB_REP_HANDLE_DEAD:	/* what handle? */
+	case DB_REP_HOLDELECTION:	/* maybe just rep_init it */
+	case DB_REP_IGNORE:		/* well, whatever */
+	case DB_REP_JOIN_FAILURE:
+	case DB_REP_LEASE_EXPIRED:
+	case DB_REP_LOCKOUT:
+	case DB_REP_NEWSITE:
+	case DB_REP_NOTPERM:
+	case DB_REP_UNAVAIL:
+	default:
+		if (rc) {
+			applog(LOG_INFO, "rep_process_message: %d (%s)",
+			       rc, db_strerror(rc));
+		}
+	}
+
+	return 0;
+}
+
+static int rtdb_send_more(struct db_link *dbl)
+{
+	ssize_t rc;
+
+	if (!dbl->togo) {
+ /* P3 */ applog(LOG_INFO, "stray write event");
+		event_del(&dbl->wrev);
+		dbl->writing = false;
+		return 0;
+	}
+
+	rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+	if (rc < 0) {
+		applog(LOG_ERR, "socket write error: %s", strerror(errno));
+		dbl->done = 0;
+		dbl->togo = 0;
+		return -1;
+	}
+	if (rc < dbl->togo) {
+		dbl->done += rc;
+		dbl->togo -= rc;
+		if (!dbl->writing) {
+			if (event_add(&dbl->wrev, NULL))
+				applog(LOG_ERR, "event_add failed (write)");
+			else
+				dbl->writing = true;
+		}
+	} else {
+		dbl->done = 0;
+		dbl->togo = 0;
+		if (dbl->writing) {
+			event_del(&dbl->wrev);
+			dbl->writing = false;
+		}
+	}
+	return 0;
+}
+
+static void rtdb_wr_event(int fd, short events, void *userdata)
+{
+	struct db_link *dbl = userdata;
+
+	if (rtdb_send_more(dbl))
+		dbl->state = DBC_DEAD;
+}
+
+static void rtdb_master_tcp_event(int fd, short events, void *userdata)
+{
+	struct db_conn *dbc = userdata;
+	struct rep_msg_hdr *hdr;
+	unsigned msgflags;
+	int ctllen, reclen;
+	int len;
+	int rc;
+
+	switch (dbc->lk.state) {
+	case DBC_LOGIN:
+		rc = dbl_expect(&dbc->lk);
+		if (rc < 0)
+			goto out_bad_dbc;
+		if (rc < dbc->lk.explen)
+			return;
+
+		if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+			hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+			if (dbl_hdr_validate_login(hdr, dbc->rtdb->thisid))
+				goto out_bad_dbc;
+
+			ctllen = GUINT32_FROM_BE(hdr->lenctl);
+			reclen = GUINT32_FROM_BE(hdr->lendata);
+			len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+			if (dbl_irealloc(&dbc->lk, len) < 0) {
+				applog(LOG_ERR, "No core (%d)", len);
+				goto out_bad_dbc;
+			}
+			dbc->lk.explen = len;
+		} else {
+			if (rtdb_master_login_reply(dbc, dbc->lk.ibuf))
+				goto out_bad_dbc;
+
+			dbc->lk.state = DBC_OPEN;
+			dbc->lk.cnt = 0;
+			dbc->lk.explen = sizeof(struct rep_msg_hdr);
+		}
+		break;
+	case DBC_OPEN:
+		rc = dbl_expect(&dbc->lk);
+		if (rc < 0)
+			goto out_bad_dbc;
+		if (rc < dbc->lk.explen)
+			return;
+
+		if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+			hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+			if (dbl_hdr_validate(hdr, dbc->rtdb->thisid))
+				goto out_bad_dbc;
+			msgflags = GUINT32_FROM_BE(hdr->flags);
+			if ((msgflags & 0xff) != REP_MSG_DATA) {
+				applog(LOG_ERR,
+				       "Bad data message, flags 0x%08x",
+				       msgflags);
+				goto out_bad_dbc;
+			}
+
+			ctllen = GUINT32_FROM_BE(hdr->lenctl);
+			reclen = GUINT32_FROM_BE(hdr->lendata);
+			len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+			if (dbl_irealloc(&dbc->lk, len) < 0) {
+				applog(LOG_ERR, "No core (%d)", len);
+				goto out_bad_dbc;
+			}
+			dbc->lk.explen = len;
+		} else {
+			if (rtdb_process(dbc, dbc->lk.ibuf))
+				goto out_bad_dbc;
+
+			dbc->lk.state = DBC_OPEN;
+			dbc->lk.cnt = 0;
+			dbc->lk.explen = sizeof(struct rep_msg_hdr);
+		}
+		break;
+	default: // DBC_DEAD
+		if (dbc->remote) {
+			applog(LOG_INFO,
+			       "Event on a dead slave socket, slave %s",
+			       dbc->remote->host);
+		} else {
+			applog(LOG_INFO,
+			       "Event on a dead slave socket");
+		}
+		tdb_conn_scrub_cb();
+	}
+	return;
+
+ out_bad_dbc:
+	dbc->lk.state = DBC_DEAD;
+	tdb_conn_scrub_cb();
+	return;
+}
+
+static void tdb_conn_event(int fd, short events, void *userdata)
+{
+	struct tablerep *rtdb = userdata;
+	struct db_conn *dbc;
+	struct sockaddr_in6 addr;
+	socklen_t addrlen;
+	char host[65], port[15];
+
+	dbc = dbc_alloc(rtdb, NULL);
+	if (!dbc)
+		goto out_dbc;
+	dbc->lk.explen = sizeof(struct rep_msg_hdr);
+	dbc->lk.state = DBC_LOGIN;
+
+	addrlen = sizeof(addr);
+	dbc->lk.fd = accept(fd, (struct sockaddr *) &addr, &addrlen);
+	if (dbc->lk.fd < 0) {
+		applog(LOG_ERR, "accept: %s", strerror(errno));
+		goto out_accept;
+	}
+
+	getnameinfo((struct sockaddr *) &addr, addrlen,
+		    host, sizeof(host), port, sizeof(port),
+		    NI_NUMERICHOST|NI_NUMERICSERV);
+	applog(LOG_INFO, "db slave host %s port %s", host, port);
+
+	if (fcntl(dbc->lk.fd, F_SETFL, O_NONBLOCK) < 0) {
+		applog(LOG_ERR, "fcntl: %s", strerror(errno));
+		goto out_flags;
+	}
+
+	event_set(&dbc->lk.rcev, dbc->lk.fd, EV_READ | EV_PERSIST,
+		  rtdb_master_tcp_event, dbc);
+	event_set(&dbc->lk.wrev, dbc->lk.fd, EV_WRITE | EV_PERSIST,
+		  rtdb_wr_event, &dbc->lk);
+	if (event_add(&dbc->lk.rcev, NULL) < 0) {
+		applog(LOG_ERR, "event_add failed");
+		goto out_add;
+	}
+	list_add_tail(&dbc->link, &rtdb->conns);
+	return;
+
+ out_add:
+ out_flags:
+	close(dbc->lk.fd);
+ out_accept:
+	dbc_free(dbc);
+ out_dbc:
+	return;
+}
+
+static int tdb_rep_listen_open(struct sockaddr_in *addr, int addr_len)
+{
+	int fd;
+	int on;
+	int rc;
+
+	fd = socket(addr->sin_family, SOCK_STREAM, 0);
+	if (fd < 0)
+		return -errno;
+
+	on = 1;
+	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+		rc = -errno;
+		goto out_err;
+	}
+
+	if (bind(fd, addr, addr_len) < 0) {
+		rc = -errno;
+		goto out_err;
+	}
+
+	// rc = fsetflags("tcp server", fd, O_NONBLOCK);
+	// if (rc) {
+	// 	rc = -errno;
+	// 	goto out_err;
+	// }
+
+	if (listen(fd, 100) < 0) {
+		rc = -errno;
+		goto out_err;
+	}
+
+	return fd;
+
+ out_err:
+	close(fd);
+	return rc;
+}
+
+static int rtdb_rep_listen(struct tablerep *rtdb, unsigned short port)
+{
+	struct sockaddr_in addr4;
+	struct sockaddr_in6 addr6;
+	int rc;
+
+	memset(&addr6, 0, sizeof(addr6));
+	addr6.sin6_family = AF_INET6;
+	addr6.sin6_port = htons(port);
+	memcpy(&addr6.sin6_addr, &in6addr_any, sizeof(struct in6_addr));
+	rc = tdb_rep_listen_open((struct sockaddr_in *)&addr6, sizeof(addr6));
+	if (rc < 0) {
+		if (debugging)
+			applog(LOG_DEBUG,
+			       "tdb_rep_listen_open(v6, %u) failed: %s",
+			       port, strerror(-rc));
+	} else {
+		rtdb->sockfd6 = rc;
+		event_set(&rtdb->lsev6, rtdb->sockfd6, EV_READ | EV_PERSIST,
+			  tdb_conn_event, rtdb);
+		if (event_add(&rtdb->lsev6, NULL) < 0)
+			applog(LOG_ERR, "event_add failed");
+	}
+
+	memset(&addr4, 0, sizeof(addr4));
+	addr4.sin_family = AF_INET;
+	addr4.sin_port = htons(port);
+	addr4.sin_addr.s_addr = htonl(INADDR_ANY);
+	rc = tdb_rep_listen_open((struct sockaddr_in *)&addr4, sizeof(addr4));
+	if (rc < 0) {
+		if (debugging)
+			applog(LOG_DEBUG,
+			       "tdb_rep_listen_open(v4, %u) failed: %s",
+			       port, strerror(-rc));
+	} else {
+		rtdb->sockfd4 = rc;
+		event_set(&rtdb->lsev4, rtdb->sockfd4, EV_READ | EV_PERSIST,
+			  tdb_conn_event, rtdb);
+		if (event_add(&rtdb->lsev4, NULL) < 0)
+			applog(LOG_ERR, "event_add failed");
+	}
+
+	return 0;
+}
+
+static void rtdb_slave_tcp_event(int fd, short events, void *userdata)
+{
+	struct db_conn *dbc = userdata;
+	struct tablerep *rtdb = dbc->rtdb;
+	struct rep_msg_hdr *hdr;
+	unsigned msgflags;
+	int srcid, dstid;
+	int ctllen, reclen;
+	int len;
+	int rc;
+
+	switch (dbc->lk.state) {
+	case DBC_LOGIN:
+		rc = dbl_expect(&dbc->lk);
+		if (rc < 0)
+			goto out_bad_dbc;
+		if (rc < dbc->lk.explen)
+			return;
+
+		hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+		if (dbl_hdr_validate(hdr, rtdb->thisid))
+			goto out_bad_dbc;
+		msgflags = GUINT32_FROM_BE(hdr->flags);
+		if ((msgflags & 0xff) != REP_MSG_LOGOK) {
+			applog(LOG_ERR, "Bad login reply, flags 0x%08x",
+			       msgflags);
+			goto out_bad_dbc;
+		}
+		srcid = GUINT16_FROM_BE(hdr->src);
+		dstid = GUINT16_FROM_BE(hdr->dst);
+
+		if (rtdb->thisid == 0) {
+			applog(LOG_INFO, "Assigned local dbid %d", dstid);
+		} else {
+			if (rtdb->thisid != dstid) {
+				/*
+				 * Oracle people posted that db won't like this,
+				 * but what can we do. At worst, blow away the
+				 * local db on slave by hand and let it resync.
+				 */
+				applog(LOG_INFO,
+				       "Reassigned local dbid from %d to %d",
+				       rtdb->thisid, dstid);
+				rtdb->thisid = dstid;
+			}
+		}
+		rtdb->thisid = dstid;
+
+		dbc->lk.state = DBC_OPEN;
+		dbc->lk.cnt = 0;
+		dbc->lk.explen = sizeof(struct rep_msg_hdr);
+
+		if (tdb_slave_login_cb(srcid))
+			goto out_bad_dbc;
+		break;
+	case DBC_OPEN:
+		rc = dbl_expect(&dbc->lk);
+		if (rc < 0)
+			goto out_bad_dbc;
+		if (rc < dbc->lk.explen)
+			return;
+
+		if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+			hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+			if (dbl_hdr_validate(hdr, rtdb->thisid))
+				goto out_bad_dbc;
+			msgflags = GUINT32_FROM_BE(hdr->flags);
+			if ((msgflags & 0xff) != REP_MSG_DATA) {
+				applog(LOG_ERR,
+				       "Bad data message, flags 0x%08x",
+				       msgflags);
+				goto out_bad_dbc;
+			}
+
+			ctllen = GUINT32_FROM_BE(hdr->lenctl);
+			reclen = GUINT32_FROM_BE(hdr->lendata);
+			len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+			if (dbl_irealloc(&dbc->lk, len) < 0) {
+				applog(LOG_ERR, "No core (%d)", len);
+				goto out_bad_dbc;
+			}
+			dbc->lk.explen = len;
+		} else {
+			if (rtdb_process(dbc, dbc->lk.ibuf))
+				goto out_bad_dbc;
+
+			dbc->lk.state = DBC_OPEN;
+			dbc->lk.cnt = 0;
+			dbc->lk.explen = sizeof(struct rep_msg_hdr);
+		}
+		break;
+	case DBC_DEAD:
+		tdb_slave_disc_cb();
+		break;
+	default:
+		/* P3 */ applog(LOG_INFO, "Event on a unready socket");
+	}
+	return;
+
+ out_bad_dbc:
+	dbc->lk.state = DBC_DEAD;
+	tdb_conn_scrub_cb();
+	return;
+}
+
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf)
+{
+	struct tablerep *rtdb = dbc->rtdb;
+	struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+	int srclen, dstlen;
+	char *srcname, *dstname;
+	struct db_conn *tmp;
+	struct db_remote *slave;
+	int newid;
+	struct rep_msg_hdr hdrb;
+	unsigned int msgflags;
+	int rc;
+
+	/*
+	 * Before proceeding, extract and zero-terminate src and dst names.
+	 */
+	dstlen = GUINT32_FROM_BE(hdr->lenctl);
+	dstname = malloc(dstlen + 1);
+	if (!dstname) {
+		applog(LOG_ERR, "No core");
+		return -1;
+	}
+	memcpy(dstname, msgbuf + sizeof(struct rep_msg_hdr), dstlen);
+	dstname[dstlen] = 0;
+
+	srclen = GUINT32_FROM_BE(hdr->lendata);
+	srcname = malloc(srclen + 1);
+	if (!srcname) {
+		applog(LOG_ERR, "No core");
+		free(dstname);
+		return -1;
+	}
+	memcpy(srcname, msgbuf + sizeof(struct rep_msg_hdr) + dstlen, srclen);
+	srcname[srclen] = 0;
+
+	if (dbc->remote) {
+		/* Never happens even with bad clients, our internal problem. */
+		applog(LOG_ERR, "Redone login for slave %s (src %s)",
+		       dbc->remote->host, srcname);
+		goto out_err;
+	}
+
+	if (strcmp(srcname, rtdb->thisname) == 0) {
+		applog(LOG_ERR, "Login from aliasing slave %s", srcname);
+		goto out_err;
+	}
+
+	slave = tdb_find_remote_byname(srcname);
+	if (!slave) {
+		applog(LOG_INFO, "Unknown slave \"%s\"", srcname);
+		goto out_err;
+	}
+
+	if (slave->dbid == DBID_NONE) {
+		newid = GUINT16_FROM_BE(hdr->src);
+		if (newid == 0 || newid < DBID_MIN || newid > DBID_MAX) {
+			newid = make_remote_id();
+		}
+		slave->dbid = newid;
+	}
+	if (debugging)
+		applog(LOG_DEBUG, "Link login, slave %s dbid %d",
+		       slave->host, slave->dbid);
+
+	/*
+	 * Dispose of all existing connections. Our current implementation
+	 * provides no security, so it is a proper thing to do. We assume
+	 * that the slave knows what it's doing, maybe it detected a loss
+	 * of TCP connection that we missed.
+	 */
+	list_for_each_entry(tmp, &rtdb->conns, link) {
+		if (tmp->remote == slave)
+			tmp->lk.state = DBC_DEAD;
+	}
+
+	dbc->remote = slave;
+
+	memset(&hdrb, 0, sizeof(hdrb));
+	msgflags = (1 << 28) | (REP_MSG_LOGOK);
+	hdrb.flags = GUINT32_TO_BE(msgflags);
+	hdrb.dst = GUINT16_TO_BE((unsigned short)slave->dbid);
+	hdrb.src = GUINT16_TO_BE((unsigned short)rtdb->thisid);
+
+	rc = write(dbc->lk.fd, &hdrb, sizeof(hdrb));
+	if (rc < 0) {
+		applog(LOG_INFO, "Write error to peer %s: %s", slave->host,
+		       strerror(errno));
+		goto out_err;
+	}
+	if (rc < sizeof(hdrb)) {
+		applog(LOG_INFO, "Write short to peer %s: %d", slave->host, rc);
+		goto out_err;
+	}
+
+	return 0;
+
+ out_err:
+	free(srcname);
+	free(dstname);
+	return -1;
+}
+
+static int rtdb_slave_login(struct db_conn *dbc)
+{
+	struct rep_msg_hdr *hdr;
+	unsigned char *msgbuf;
+	unsigned int msgflags;
+	int dstlen, srclen;
+	int len;
+
+	dstlen = strlen(dbc->remote->host);
+	srclen = strlen(dbc->rtdb->thisname);
+	len = sizeof(struct rep_msg_hdr) + dstlen + srclen;
+	msgbuf = malloc(len);
+	if (!msgbuf)
+		return -1;
+
+	hdr = (struct rep_msg_hdr *) msgbuf;
+	// memset(hdr, 0, sizeof(struct rep_msg_hdr));  /* no holes */
+	msgflags = (1 << 28) | (REP_MSG_LOGIN);
+	hdr->flags = GUINT32_TO_BE(msgflags);
+	hdr->lenctl = GUINT32_TO_BE(dstlen);
+	hdr->lendata = GUINT32_TO_BE(srclen);
+	hdr->dst = GUINT16_TO_BE((unsigned short)dbc->remote->dbid);
+	hdr->src = GUINT16_TO_BE((unsigned short)dbc->rtdb->thisid);
+	memcpy(msgbuf + sizeof(struct rep_msg_hdr), dbc->remote->host, dstlen);
+	memcpy(msgbuf + sizeof(struct rep_msg_hdr) + dstlen,
+	       dbc->rtdb->thisname, srclen);
+
+	if (write(dbc->lk.fd, msgbuf, len) < len) {
+		dbc->lk.state = DBC_DEAD;
+		free(msgbuf);
+		return -1;
+	}
+	dbc->lk.state = DBC_LOGIN;
+	dbc->lk.explen = sizeof(struct rep_msg_hdr);
+	dbc->lk.cnt = 0;
+	free(msgbuf);
+	return 0;
+}
+
+static int tdb_rep_resolve(struct tablerep *rtdb, int *family,
+			   int addrsize, unsigned char *addr, int *addrlen,
+			   const char *hostname, unsigned short port)
+{
+	char portstr[15];
+	struct addrinfo hints;
+	struct addrinfo *res, *res0;
+	int rc;
+
+	snprintf(portstr, sizeof(portstr), "%u", port);
+
+	memset(&hints, 0, sizeof(struct addrinfo));
+	hints.ai_family = PF_UNSPEC;
+	hints.ai_socktype = SOCK_DGRAM;
+
+	rc = getaddrinfo(hostname, portstr, &hints, &res0);
+	if (rc) {
+		applog(LOG_WARNING, "getaddrinfo(%s:%s) failed: %s",
+		       hostname, portstr, gai_strerror(rc));
+		return -1;
+	}
+
+	for (res = res0; res; res = res->ai_next) {
+		if (res->ai_family != AF_INET && res->ai_family != AF_INET6)
+			continue;
+
+		if (res->ai_addrlen > addrsize)		/* should not happen */
+			continue;
+
+		memcpy(addr, res->ai_addr, res->ai_addrlen);
+		*addrlen = res->ai_addrlen;
+		*family = res->ai_family;
+
+		freeaddrinfo(res0);
+		return 0;
+	}
+
+	freeaddrinfo(res0);
+
+	applog(LOG_WARNING, "getaddrinfo(%s:%s): nothing suitable",
+	       hostname, portstr);
+	return -1;
+}
+
+static int rtdb_rep_connect(struct db_conn *dbc)
+{
+	struct db_link *dbl = &dbc->lk;
+	struct db_remote *master = dbc->remote;
+	int family;
+	unsigned char addr[32];
+	int addrlen;
+	int rc;
+
+	rc = tdb_rep_resolve(dbc->rtdb, &family, sizeof(addr), addr, &addrlen,
+			     master->host, master->port);
+	if (rc < 0)
+		return -1;
+
+	rc = socket(family, SOCK_STREAM, 0);
+	if (rc < 0) {
+		applog(LOG_WARNING, "socket: %s", strerror(errno));
+		return -1;
+	}
+	dbl->fd = rc;
+
+	if (connect(dbl->fd, (struct sockaddr *)addr, addrlen)) {
+		applog(LOG_WARNING, "connect(host %s port %u): %s",
+		       master->host, master->port, strerror(errno));
+		close(dbl->fd);
+		return -1;
+	}
+
+	if (fcntl(dbl->fd, F_SETFL, O_NONBLOCK) < 0) {
+		applog(LOG_ERR, "fcntl: %s", strerror(errno));
+		close(dbl->fd);
+		return -1;
+	}
+
+	event_set(&dbl->rcev, dbl->fd, EV_READ | EV_PERSIST,
+		  rtdb_slave_tcp_event, dbc);
+	if (event_add(&dbl->rcev, NULL) < 0) {
+		applog(LOG_ERR, "event_add failed");
+		close(dbl->fd);
+		return -1;
+	}
+	event_set(&dbl->wrev, dbl->fd, EV_WRITE | EV_PERSIST,
+		  rtdb_wr_event, dbl);
+	return 0;
+}
+
+static void __rtdb_fini(struct tablerep *rtdb)
+{
+	struct db_conn *dbc;
+
+	if (rtdb->sockfd4 >= 0) {
+		event_del(&rtdb->lsev4);
+		close(rtdb->sockfd4);
+		rtdb->sockfd4 = -1;
+	}
+	if (rtdb->sockfd6 >= 0) {
+		event_del(&rtdb->lsev6);
+		close(rtdb->sockfd6);
+		rtdb->sockfd6 = -1;
+	}
+
+	while (!list_empty(&rtdb->conns)) {
+		dbc = list_entry(rtdb->conns.next, struct db_conn, link);
+		list_del(&dbc->link);
+		dbc_free(dbc);
+	}
+	rtdb->mdbc = NULL;
+}
+
+/*
+ * return:
+ *  -1 - there was an error, things are in disarray, must call __rtdb_fini.
+ *   0 - all is up, may call tdb_init if desired.
+ *   1 - not done yet, just return to dispatch.
+ */
+static int __rtdb_start(struct tablerep *rtdb, bool we_are_master,
+			struct db_remote *rep_master, unsigned short rep_port)
+{
+	struct db_conn *dbc;
+
+	if (we_are_master) {
+		if (rtdb->thisid == DBID_NONE)
+			rtdb->thisid = make_remote_id();
+		if (rtdb_rep_listen(rtdb, rep_port))
+			return -1;
+	} else {
+		if (!rep_master) {
+			applog(LOG_INFO, "No master yet"); /* P3 */
+			return -1;
+		}
+		if (!rtdb->mdbc) {
+			dbc = dbc_alloc(rtdb, rep_master);
+			if (!dbc)
+				return -1;
+			dbc->lk.explen = sizeof(struct rep_msg_hdr);
+			dbc->lk.state = DBC_INIT;
+			list_add_tail(&dbc->link, &rtdb->conns);
+			rtdb->mdbc = dbc;
+		}
+		switch (rtdb->mdbc->lk.state) {
+		case DBC_OPEN:
+			break;
+		case DBC_INIT:
+			if (rtdb_rep_connect(rtdb->mdbc))
+				return -1;
+			if (rtdb_slave_login(rtdb->mdbc))
+				return -1;
+			return 1;
+		case DBC_LOGIN:
+			/* P3 */ applog(LOG_INFO, "start: no answer");
+			return -1;
+		default:
+			/* P3 */ applog(LOG_INFO, "start: confusion (state %d)",
+						 rtdb->mdbc->lk.state);
+			return -1;
+		}
+	}
+	return 0;
+}
+
+int rtdb_init(struct tablerep *rtdb, const char *thisname)
+{
+	rtdb->thisname = thisname;
+
+	INIT_LIST_HEAD(&rtdb->conns);
+	rtdb->sockfd4 = -1;
+	rtdb->sockfd6 = -1;
+
+	// rtdb->mdbc = dbc_alloc(rtdb, NULL);
+	// if (!rtdb->mdbc)
+	// 	return -1;
+	// rtdb->mdbc->lk.explen = sizeof(struct rep_msg_hdr);
+	// rtdb->mdbc->lk.state = DBC_INIT;
+	// list_add_tail(&rtdb->mdbc.link, &rtdb->conns);
+	return 0;
+}
+
+int rtdb_start(struct tablerep *rtdb,
+	       const char *db_home,
+	       bool we_are_master,
+	       struct db_remote *rep_master, unsigned short rep_port,
+	       void (*cb)(enum db_event))
+{
+	int rc;
+
+	rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+	if (rc < 0)
+		goto err_out;
+	if (rc > 0)
+		return 0;
+
+	/*
+	 * Note that we only get here if either we're master, or slave
+	 * and link is DBC_OPEN. In both cases rtdb->thidid must be set.
+	 */
+	if (rtdb->thisid == 0) {		/* never happens */
+		applog(LOG_WARNING, "Zero own dbid, master %d", we_are_master);
+		goto err_out;
+	}
+	if (tdb_init(&rtdb->tdb, db_home, NULL, "tabled", true,
+		     rtdb->thisid, db4_rep_send, we_are_master, cb)) {
+		goto err_out;
+	}
+	return 0;
+
+err_out:
+	__rtdb_fini(rtdb);
+	return -1;
+}
+
+void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+		   struct db_remote *rep_master, unsigned short rep_port)
+{
+	int rc;
+
+	__rtdb_fini(rtdb);
+	rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+	if (rc < 0) {
+		/*
+		 * If we failed to reconnect immediately, we do not retry.
+		 * This is because db4 has its own timeouts, so there's really
+		 * no point in doing anything else: we would only interfere.
+		 * From now on, rely on CLD to drive the attempts to reconnect.
+		 */
+		/* P3 */ applog(LOG_INFO, "failed to reconnect (%d)", rc);
+	}
+}
+
+void rtdb_dbc_scrub(struct tablerep *rtdb)
+{
+	struct db_conn *dbc, *tmp;
+
+	list_for_each_entry_safe(dbc, tmp, &rtdb->conns, link) {
+		if (dbc->lk.state == DBC_DEAD) {
+			/*
+			 * This prinout is misleading, since every remote
+			 * may have several connections. But how to fix it?
+			 */
+			if (dbc->remote) {
+				applog(LOG_INFO, "Closing, peer %s",
+				       dbc->remote->host);
+			} else {
+				applog(LOG_INFO, "Closing");
+			}
+			if (dbc == rtdb->mdbc)
+				rtdb->mdbc = NULL;
+			list_del(&dbc->link);
+			dbc_free(dbc);
+		}
+	}
+}
+
+/*
+ * This wants to be both in here and in tdb.c. Problem.
+ */
+int rtdb_restart(struct tablerep *rtdb, bool we_are_master)
+{
+	DB_ENV *dbenv = rtdb->tdb.env;
+	unsigned int rep_flags;
+	int rc;
+
+	rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+	rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+	if (rc) {
+		dbenv->err(dbenv, rc, "rep_start(0x%x)", rep_flags);
+		return -1;
+	}
+	return 0;
+}
+
+void rtdb_fini(struct tablerep *rtdb)
+{
+	__rtdb_fini(rtdb);
+	tdb_fini(&rtdb->tdb);
+}
+
diff --git a/server/object.c b/server/object.c
index f8e7b12..3801e94 100644
--- a/server/object.c
+++ b/server/object.c
@@ -39,7 +39,7 @@
 static int object_find(DB_TXN *txn, const char *bucket, const char *key,
 		       struct db_obj_ent *pobj)
 {
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	size_t alloc_len;
 	DBT pkey, pval;
@@ -72,7 +72,7 @@ static int object_find(DB_TXN *txn, const char *bucket, const char *key,
 
 static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
 {
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	size_t okey_len;
 	DBT pkey;
@@ -100,7 +100,7 @@ static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
 
 bool object_del_acls(DB_TXN *txn, const char *bucket, const char *key)
 {
-	DB *acls = tdb.acls;
+	DB *acls = tdbrep.tdb.acls;
 	struct db_acl_key *akey;
 	size_t alloc_len;
 	DBT pkey;
@@ -163,8 +163,8 @@ bool object_del(struct client *cli, const char *user,
 	int rc;
 	enum errcode err = InternalError;
 	size_t alloc_len;
-	DB_ENV *dbenv = tdb.env;
-	DB *objs = tdb.objs;
+	DB_ENV *dbenv = tdbrep.tdb.env;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	struct db_obj_ent obje;
 	DBT pkey, pval;
@@ -326,9 +326,9 @@ static bool object_put_end(struct client *cli)
 	struct db_obj_ent oldobj;
 	bool delobj;
 	size_t alloc_len;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DBT pkey, pval;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	DB_TXN *txn = NULL;
 	GByteArray *string_data;
 	GArray *string_lens;
@@ -786,7 +786,7 @@ static bool object_put_body(struct client *cli, const char *user,
 		return cli_err(cli, InternalError);
 	}
 
-	objid = objid_next(&tabled_srv.object_count, &tdb);
+	objid = objid_next(&tabled_srv.object_count, &tdbrep.tdb);
 
 	rc = open_chunks(&cli->out_ch, &tabled_srv.all_stor,
 			 cli, objid, content_len);
@@ -865,9 +865,9 @@ static bool object_put_acls(struct client *cli, const char *user,
 {
 	enum errcode err = InternalError;
 	enum ReqACLC canacl;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	char *hdr;
 	char timestr[64];
 	int rc;
@@ -1130,7 +1130,7 @@ static bool object_get_body(struct client *cli, const char *user,
 	bool access_ok, modified = true;
 	GString *extra_hdr;
 	size_t alloc_len;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	struct db_obj_ent *obj = NULL;
 	DBT pkey, pval;
diff --git a/server/replica.c b/server/replica.c
index ac14cb2..1b5e832 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -612,8 +612,8 @@ static void rep_scan_verify(struct rep_arg *arg,
 
 static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
 {
-	DB_ENV *db_env = tdb.env;
-	DB *db_objs = tdb.objs;
+	DB_ENV *db_env = tdbrep.tdb.env;
+	DB *db_objs = tdbrep.tdb.objs;
 	DB_TXN *db_txn;
 	DBT pkey, pval;
 	struct db_obj_ent *obj;
@@ -749,8 +749,8 @@ static void rep_scan(struct rep_arg *arg)
 	g_mutex_unlock(kscan_mutex);
 
 	memset(&cur, 0, sizeof(struct cursor));	/* enough to construct */
-	cur.db_env = tdb.env;
-	cur.db_objs = tdb.objs;
+	cur.db_env = tdbrep.tdb.env;
+	cur.db_objs = tdbrep.tdb.objs;
 
 	kcnt = 0;
 	for (;;) {
diff --git a/server/server.c b/server/server.c
index 814afec..8859847 100644
--- a/server/server.c
+++ b/server/server.c
@@ -97,12 +97,15 @@ struct server tabled_srv = {
 	.config			= "/etc/tabled.conf",
 };
 
-struct tabledb tdb;
+struct tablerep tdbrep;
 
 enum {
 	TT_CMD_DUMP,
 	TT_CMD_TDBST_MASTER,
-	TT_CMD_TDBST_SLAVE
+	TT_CMD_TDBST_SLAVE,
+	TT_CMD_MASTER_LINK_RESET,
+	TT_CMD_LINK_SCRUB,
+	TT_CMDNUM
 };
 
 struct compiled_pat patterns[] = {
@@ -114,7 +117,11 @@ struct compiled_pat patterns[] = {
 };
 
 static char *state_name_tdb[ST_TDBNUM] = {
-	"Init", "Open", "Active", "Master", "Slave"
+	"Init", "Open", "Master", "Slave"
+};
+
+static char *cmd_name_tdb[TT_CMDNUM] = {
+	"Dump", "GoMaster", "GoSlave", "MasterLinkReset", "LinkScrub"
 };
 
 static struct {
@@ -340,7 +347,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
 	 * not match.
 	 */
 
-	rc = tdb.passwd->get(tdb.passwd, NULL, &key, &val, 0);
+	rc = tdbrep.tdb.passwd->get(tdbrep.tdb.passwd, NULL, &key, &val, 0);
 	if (rc) {
 		pass = strdup("");
 
@@ -350,7 +357,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
 			char s[64];
 
 			snprintf(s, 64, "get user '%s'", user);
-			tdb.passwd->err(tdb.passwd, rc, s);
+			tdbrep.tdb.passwd->err(tdbrep.tdb.passwd, rc, s);
 		}
 	} else {
 		pass = val.data;
@@ -387,8 +394,22 @@ static void stats_signal(int signo)
 
 static void stats_dump(void)
 {
-	applog(LOG_INFO, "STATE: TDB %s",
-	    state_name_tdb[tabled_srv.state_tdb]);
+	struct db_remote *rp;
+	GList *tmp;
+
+	applog(LOG_INFO, "TDB: group %s state %s host %s rep_port %d dbid %d%s",
+	       tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+	       tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+	       (tabled_srv.mc_delay)? " mc_delay": "");
+	for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+		rp = tmp->data;
+		applog(LOG_INFO, "PN: name %s dbid %d", rp->name, rp->dbid);
+		if (rp->host)
+			applog(LOG_INFO, "PN: host %s port %d",
+			       rp->host, rp->port);
+		if (rp == tabled_srv.rep_master)
+			applog(LOG_INFO, "PN (master)");
+	}
 	applog(LOG_INFO,
 	       "STATS: poll %lu event %lu tcp_accept %lu opt_write %lu",
 	       tabled_srv.stats.poll,
@@ -403,11 +424,17 @@ static void stats_dump(void)
 
 bool stat_status(struct client *cli, GList *content)
 {
+	struct db_remote *rp;
+	GList *tmp;
 	char *str;
+	int rc;
 
 	/*
 	 * The loadavg is system dependent, we'll figure it out later.
 	 * On Linux, applications read from /proc/loadavg.
+	 *
+	 * The listening info duplicates the hostname until we split
+	 * the replication identifier from hostname.
 	 */
 	if (asprintf(&str,
 		     "<h1>Status</h1>"
@@ -415,11 +442,50 @@ bool stat_status(struct client *cli, GList *content)
 		     tabled_srv.ourhost, tabled_srv.port) < 0)
 		return false;
 	content = g_list_append(content, str);
+
 	if (asprintf(&str,
-		     "<p>State: TDB %s</p>\r\n",
-		     state_name_tdb[tabled_srv.state_tdb]) < 0)
+		     "<p>TDB: group %s "
+		     "state %s host %s rep_port %d dbid %d%s</p>\r\n",
+		     tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+		     tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+		     (tabled_srv.mc_delay)? " mc_delay": "") < 0)
 		return false;
 	content = g_list_append(content, str);
+
+	if (tabled_srv.rep_remotes) {
+		if (asprintf(&str, "<p>") < 0)
+			return false;
+		content = g_list_append(content, str);
+		for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+			rp = tmp->data;
+			rc = asprintf(&str, "Peer: name %s dbid %d",
+				      rp->name, rp->dbid);
+			if (rc < 0)
+				return false;
+			content = g_list_append(content, str);
+			if (rp->host) {
+				rc = asprintf(&str, " host %s port %d",
+					      rp->host, rp->port);
+				if (rc < 0)
+					return false;
+				content = g_list_append(content, str);
+			}
+			if (rp == tabled_srv.rep_master) {
+				str = strdup(" (master)");
+				if (!str)
+					return false;
+				content = g_list_append(content, str);
+			}
+			rc = asprintf(&str, "<br />\r\n");
+			if (rc < 0)
+				return false;
+			content = g_list_append(content, str);
+		}
+		if (asprintf(&str, "</p>\r\n") < 0)
+			return false;
+		content = g_list_append(content, str);
+	}
+
 	if (asprintf(&str,
 		     "<p>Stats: "
 		     "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
@@ -1421,7 +1487,7 @@ static void add_chkpt_timer(void)
 
 static void tdb_checkpoint(int fd, short events, void *userdata)
 {
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	int rc;
 
 	if (debugging)
@@ -1436,29 +1502,50 @@ static void tdb_checkpoint(int fd, short events, void *userdata)
 	add_chkpt_timer();
 }
 
+static void add_reup_timer(void)
+{
+	static const struct timeval tv = { TABLED_REUP_SEC, 0 };
+
+	if (evtimer_add(&tabled_srv.reup_timer, &tv) < 0)
+		applog(LOG_WARNING, "unable to add reup timer");
+}
+
+static void tdb_reup(int fd, short events, void *userdata)
+{
+
+	if (tabled_srv.state_want == ST_W_MASTER &&
+	    tabled_srv.state_tdb == ST_TDB_MASTER) {
+		/*
+		 * An upgrade failed, retry.
+		 */
+		if (rtdb_restart(&tdbrep, true)) {
+			applog(LOG_WARNING, "Cannot restart to master");
+			add_reup_timer();
+		}
+	}
+}
+
 static void tdb_state_cb(enum db_event event)
 {
 	unsigned char cmd;
 
 	switch (event) {
 	case TDB_EV_ELECTED:
-		/*
-		 * Safe to stop ignoring bogus client indication,
-		 * so unmute us by advancing the state.
-		 */
-		if (tabled_srv.state_tdb == ST_TDB_OPEN)
-			tabled_srv.state_tdb = ST_TDB_ACTIVE;
+		/* Just ignore this, we only care for the end state. */
 		break;
 	case TDB_EV_CLIENT:
+		/* P3 */ applog(LOG_INFO, "TDB event: slave, state %s", state_name_tdb[tabled_srv.state_tdb]);
+		goto overmsg;
 	case TDB_EV_MASTER:
+		/* P3 */ applog(LOG_INFO, "TDB event: master, state %s", state_name_tdb[tabled_srv.state_tdb]);
+		overmsg:
 		/*
 		 * This callback runs on the context of the replication
 		 * manager thread, and calling any of our functions thus
 		 * turns our program into a multi-threaded one. Instead
 		 * we signal the main thread to do the processing.
 		 */
-		if (tabled_srv.state_tdb != ST_TDB_INIT &&
-		    tabled_srv.state_tdb != ST_TDB_OPEN) {
+		if (tabled_srv.state_tdb != ST_TDB_INIT) {
 			if (event == TDB_EV_MASTER)
 				cmd = TT_CMD_TDBST_MASTER;
 			else
@@ -1472,6 +1559,55 @@ static void tdb_state_cb(enum db_event event)
 	}
 }
 
+void cld_update_cb(void)
+{
+	switch (tabled_srv.state_want) {
+	case ST_W_MASTER:
+		if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+			; /* CLD caught up to DB, better late than never */
+		} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+			/* CLD tells us to upgrade, do it */
+			if (rtdb_restart(&tdbrep, true)) {
+				applog(LOG_WARNING,
+				       "Unable to restart to master");
+				/*
+				 * Don't try rtdb_fini here, will end in a hang.
+				 * Instead, retry endlessly until it succeeds.
+				 */
+				add_reup_timer();
+			}
+		} else {
+			applog(LOG_WARNING, "Want Master while in state %s",
+			       state_name_tdb[tabled_srv.state_tdb]);
+		}
+		break;
+	case ST_W_SLAVE:
+		if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+			; /* all good */
+		} else if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+			/*
+			 * OK, this is bad. We lost our CLD session and some
+			 * other node went master on us. Even if we downgrade
+			 * the database now, some clients may have done some
+			 * operations while CLD was bouncing. Complain loudly.
+			 */
+			applog(LOG_WARNING,
+			       "Downgrading the database,"
+			       " data loss is possible");
+			if (rtdb_restart(&tdbrep, false)) {
+				tabled_srv.state_tdb = ST_TDB_INIT;
+				rtdb_fini(&tdbrep);
+			}
+		} else {
+			applog(LOG_WARNING, "Want Slave while in state %s",
+			       state_name_tdb[tabled_srv.state_tdb]);
+		}
+		break;
+	default:
+		;
+	}
+}
+
 /*
  * Due to the way storage_node management is tightly woven into the
  * server, the management of nodes is not in storage.c, which deals
@@ -1485,7 +1621,6 @@ int stor_update_cb(void)
 {
 	int num_up;
 	struct storage_node *stn;
-	unsigned int env_flags;
 
 	if (debugging)
 		applog(LOG_DEBUG, "Know of potential %d storage node(s)",
@@ -1518,15 +1653,13 @@ int stor_update_cb(void)
 	 * We initiate operations even if there's no redundancy in order
 	 * to permit bootstrapping and build-time self-checking.
 	 */
+/* P3 */ applog(LOG_INFO, "storage updated, TDB state %s", state_name_tdb[tabled_srv.state_tdb]);
 	if (tabled_srv.state_tdb == ST_TDB_INIT) {
 		tabled_srv.state_tdb = ST_TDB_OPEN;
-
-		env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
-		if (tdb_init(&tdb, tabled_srv.tdb_dir, NULL,
-			     env_flags, "tabled", true,
-			     tabled_srv.rep_remotes,
-			     tabled_srv.ourhost, tabled_srv.rep_port,
-			     tdb_state_cb)) {
+		if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+			      tabled_srv.state_want == ST_W_MASTER,
+			      tabled_srv.rep_master,
+			      tabled_srv.rep_port, tdb_state_cb)) {
 			tabled_srv.state_tdb = ST_TDB_INIT;
 			applog(LOG_ERR, "Failed to open TDB, limping");
 		}
@@ -1535,10 +1668,122 @@ int stor_update_cb(void)
 		 * FIXME This is where we should process redundancy decreases.
 		 */
 		;
+	} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+		if (tabled_srv.state_want == ST_W_MASTER) {
+			if (rtdb_restart(&tdbrep, true)) {
+				applog(LOG_WARNING,
+				       "Failed to restart to master");
+				add_reup_timer();
+			}
+		}
 	}
 	return num_up;
 }
 
+int tdb_slave_login_cb(int srcid)
+{
+	struct db_remote *master;
+
+	master = tabled_srv.rep_master;
+	if (!master) {
+		applog(LOG_INFO, "No master at login");
+		return -1;
+	}
+	if (master->dbid == 0) {
+		applog(LOG_INFO, "Master dbid %d", srcid);
+	} else {
+		if (master->dbid != srcid) {
+			/*
+			 * This is probably a bad news. Perhaps master rebooted
+			 * on the other side of the network partition and yet
+			 * somehow won a lock in CLD, or something even weirder.
+			 * But we don't know.
+			 */
+			applog(LOG_INFO,
+			       "Master switch from dbid %d to dbid %d",
+			       master->dbid, srcid);
+		}
+	}
+	master->dbid = srcid;
+
+	if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+		applog(LOG_INFO, "Established link, master %s dbid %d",
+		       master->name, master->dbid);
+		if (tabled_srv.state_want != ST_W_SLAVE) {
+			applog(LOG_ERR, "Unexpected TDB state %s, limping",
+			       state_name_tdb[tabled_srv.state_tdb]);
+			rtdb_fini(&tdbrep);
+			tabled_srv.state_tdb = ST_TDB_INIT;
+			return -1;
+		}
+		if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+			       false,
+			       master,
+			       tabled_srv.rep_port, tdb_state_cb)) {
+			tabled_srv.state_tdb = ST_TDB_INIT;
+			applog(LOG_ERR, "Failed to open TDB, limping");
+			return -1;
+		}
+	} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+		applog(LOG_INFO, "Recovered master connection");
+	} else {
+		applog(LOG_INFO, "Confused about connections");
+	}
+	return 0;
+}
+
+void tdb_slave_disc_cb(void)
+{
+	static const struct timeval tv = { TABLED_MCWAIT_SEC, 0 };
+
+	if (tabled_srv.mc_delay)
+		return;
+	evtimer_add(&tabled_srv.mc_timer, &tv);
+	tabled_srv.mc_delay = true;
+}
+
+static void tdb_mc_delay(int fd, short events, void *userdata)
+{
+	static const unsigned char cmd = TT_CMD_MASTER_LINK_RESET;
+
+	tabled_srv.mc_delay = false;
+	write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+void tdb_conn_scrub_cb(void)
+{
+	unsigned char cmd;
+
+	cmd = TT_CMD_LINK_SCRUB;
+	write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+struct db_remote *tdb_find_remote_byname(const char *name)
+{
+	struct db_remote *rp;
+	GList *tmp;
+
+	for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+		rp = tmp->data;
+		if (strcmp(rp->name, name) == 0)
+			return rp;
+	}
+	return NULL;
+}
+
+struct db_remote *tdb_find_remote_byid(int id)
+{
+	struct db_remote *rp;
+	GList *tmp;
+
+	for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+		rp = tmp->data;
+		if (rp->dbid == id)
+			return rp;
+	}
+	return NULL;
+}
+
 static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 			   int addr_len, void *addr_ptr, bool is_status)
 {
@@ -1833,26 +2078,66 @@ static void compile_patterns(void)
 	}
 }
 
-static void tdb_state_process(enum st_tdb new_state)
+static void tdb_startup(void)
 {
 	unsigned int db_flags;
 
-	if (debugging)
-		applog(LOG_DEBUG, "TDB state > %s", state_name_tdb[new_state]);
-	if ((new_state == ST_TDB_MASTER || new_state == ST_TDB_SLAVE) &&
-	    tabled_srv.state_tdb == ST_TDB_ACTIVE) {
+	db_flags = DB_CREATE | DB_THREAD;
+	if (tdb_up(&tdbrep.tdb, db_flags))
+		return;
+	if (objid_init(&tabled_srv.object_count, &tdbrep.tdb)) {
+		tdb_down(&tdbrep.tdb);
+		return;
+	}
+	add_chkpt_timer();
+	rep_start();
+	net_listen_client();
+}
 
-		db_flags = DB_CREATE | DB_THREAD;
-		if (tdb_up(&tdb, db_flags))
-			return;
+static void tdb_state_process(enum st_tdb new_state)
+{
 
-		if (objid_init(&tabled_srv.object_count, &tdb)) {
-			tdb_down(&tdb);
-			return;
+	applog(LOG_INFO, "TDB state %s > %s",
+	       state_name_tdb[tabled_srv.state_tdb], state_name_tdb[new_state]);
+
+	if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+		if (new_state == ST_TDB_MASTER) {
+			if (tabled_srv.state_want == ST_W_MASTER) {
+				tdb_startup();
+			} else {
+				/*
+				 * We want slave if we cannot connect to CLD,
+				 * or we cannot lock the master file, which
+				 * means that other master may exist.
+				 * But the db goes master on us, so
+				 * either the other master is dead or we're
+				 * misconfigured so DBs cannot talk.
+				 * Either way, we should poke db until the
+				 * desired result is accomplished. XXX
+				 */
+				applog(LOG_INFO, "TDB went Master on us");
+			}
+		} else if (new_state == ST_TDB_SLAVE) {
+			applog(LOG_INFO, "TDB went Slave, so whatever");
+			;
+		} else {
+			applog(LOG_ERR, "TDB went to unexpected state");
+		}
+	} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+		if (new_state == ST_TDB_MASTER) {
+			if (tabled_srv.state_want == ST_W_MASTER) {
+				tdb_startup();
+			} else {
+				/*
+				 * This is either a net split or CLD is doing
+				 * its timeouts and so we do not want to be
+				 * a master yet.
+				 */
+				applog(LOG_ERR, "TDB upgraded on us");
+			}
+		} else {
+			applog(LOG_ERR, "TDB is confused");
 		}
-		add_chkpt_timer();
-		rep_start();
-		net_listen_client();
 	}
 }
 
@@ -1871,6 +2156,11 @@ static void internal_event(int fd, short events, void *userdata)
 		abort();
 	}
 
+	if (debugging) {
+		applog(LOG_DEBUG, "Context Event %s, TDB state %s",
+		    cmd_name_tdb[cmd], state_name_tdb[tabled_srv.state_tdb]);
+	}
+
 	switch (cmd) {
 	case TT_CMD_DUMP:
 		stats_dump();
@@ -1890,6 +2180,15 @@ static void internal_event(int fd, short events, void *userdata)
 		}
 		break;
 
+	case TT_CMD_MASTER_LINK_RESET:
+		rtdb_mc_reset(&tdbrep, tabled_srv.state_want == ST_W_MASTER,
+			      tabled_srv.rep_master, tabled_srv.rep_port);
+		break;
+
+	case TT_CMD_LINK_SCRUB:
+		rtdb_dbc_scrub(&tdbrep);
+		break;
+
 	default:
 		applog(LOG_WARNING, "%s BUG: command 0x%x", __func__, cmd);
 		break;
@@ -1905,6 +2204,7 @@ int main (int argc, char *argv[])
 	INIT_LIST_HEAD(&tabled_srv.all_stor);
 	INIT_LIST_HEAD(&tabled_srv.write_compl_q);
 	tabled_srv.state_tdb = ST_TDB_INIT;
+	tabled_srv.rep_next_id = DBID_MIN;
 
 	/* isspace() and strcasecmp() consistency requires this */
 	setlocale(LC_ALL, "C");
@@ -1978,6 +2278,8 @@ int main (int argc, char *argv[])
 	tabled_srv.evbase_main = event_init();
 	event_base_rep = event_base_new();
 	evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL);
+	evtimer_set(&tabled_srv.mc_timer, tdb_mc_delay, NULL);
+	evtimer_set(&tabled_srv.reup_timer, tdb_reup, NULL);
 
 	/* set up internal communication pipe */
 	if (pipe(tabled_srv.ev_pipe) < 0) {
@@ -1991,6 +2293,13 @@ int main (int argc, char *argv[])
 		goto err_pevt;
 	}
 
+	/* late-construct structures with allocations */
+	if (rtdb_init(&tdbrep, tabled_srv.ourhost)) {
+		applog(LOG_WARNING, "rtdb_init");
+		rc = 1;
+		goto err_rtdb;
+	}
+
 	/* set up server networking */
 	if (tabled_srv.status_port) {
 		if (net_open_known(tabled_srv.status_port, true) == 0)
@@ -2000,7 +2309,8 @@ int main (int argc, char *argv[])
 	if (rc)
 		goto err_out_net;
 
-	if (cld_begin(tabled_srv.ourhost, tabled_srv.group, verbose) != 0) {
+	if (cld_begin(tabled_srv.ourhost, tabled_srv.group,
+		      tabled_srv.rep_name, verbose) != 0) {
 		rc = 1;
 		goto err_cld_session;
 	}
@@ -2023,13 +2333,13 @@ err_cld_session:
 err_out_net:
 	if (tabled_srv.state_tdb == ST_TDB_MASTER ||
 	    tabled_srv.state_tdb == ST_TDB_SLAVE) {
-		tdb_down(&tdb);
-		tdb_fini(&tdb);
-	} else if (tabled_srv.state_tdb == ST_TDB_OPEN ||
-		   tabled_srv.state_tdb == ST_TDB_ACTIVE) {
-		tdb_fini(&tdb);
+		tdb_down(&tdbrep.tdb);
+		rtdb_fini(&tdbrep);
+	} else if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+		rtdb_fini(&tdbrep);
 	}
-/* err_tdb_init: */
+err_rtdb:
+	event_del(&tabled_srv.pevt);
 err_pevt:
 	close(tabled_srv.ev_pipe[0]);
 	close(tabled_srv.ev_pipe[1]);
diff --git a/server/tabled.h b/server/tabled.h
index ff419e3..c90511c 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -45,6 +45,8 @@ enum {
 
 	TABLED_CHKPT_SEC	= 60 * 5,	/* secs between db4 chkpt */
 	TABLED_RESCAN_SEC	= 60*3 + 7,	/* secs btw key rescans */
+	TABLED_MCWAIT_SEC	= 35,		/* secs to moderate reconn. */
+	TABLED_REUP_SEC		= 35,		/* secs to retry rtdb_restart */
 
 	CHUNK_REBOOT_TIME	= 3*60,		/* secs to declare chunk dead */
 
@@ -200,8 +202,12 @@ struct client {
 	char			req_buf[CLI_REQ_BUF_SZ]; /* input buffer */
 };
 
+enum st_want {
+	ST_W_INIT, ST_W_MASTER, ST_W_SLAVE
+};
+
 enum st_tdb {
-	ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_ACTIVE, ST_TDB_MASTER, ST_TDB_SLAVE,
+	ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_MASTER, ST_TDB_SLAVE,
 	ST_TDBNUM
 };
 
@@ -218,6 +224,17 @@ struct server_stats {
 	unsigned long		max_write_buf;
 };
 
+#define DBID_NONE      0
+#define DBID_MIN       2
+#define DBID_MAX     105
+
+struct db_remote {		/* other DB nodes */
+	char		*name;			/* do not resolve as a host */
+	char		*host;
+	unsigned short	port;
+	int		dbid;			/* signed in db4, traditional */
+};
+
 struct listen_cfg {
 	/* bool			encrypt; */
 	/* char			*host; */
@@ -233,6 +250,8 @@ struct server {
 	int			ev_pipe[2];
 	struct event		pevt;
 	struct list_head	write_compl_q;	/* list of done writes */
+	bool			mc_delay;
+	struct event		mc_timer;
 
 	char			*config;	/* config file (static) */
 
@@ -242,6 +261,7 @@ struct server {
 	char			*port_file;
 	char			*chunk_user;	/* username for stc_new */
 	char			*chunk_key;	/* key for stc_new */
+	char			*rep_name;	/* db4 replication name */
 	unsigned short		rep_port;	/* db4 replication port */
 	char			*status_port;	/* status webserver */
 	char			*group;		/* our group (both T and Ch) */
@@ -249,12 +269,16 @@ struct server {
 	char			*ourhost;	/* use this if DB master */
 	struct database		*db;		/* database handle */
 	GList			*rep_remotes;
+	struct db_remote	*rep_master;	/* if we're slave */
+	int			rep_next_id;
+	struct event		reup_timer;
 
 	GList			*sockets;
 	struct list_head	all_stor;	/* struct storage_node */
 	int			num_stor;	/* number of storage_node's  */
 	uint64_t		object_count;
 
+	enum st_want		state_want;
 	enum st_tdb		state_tdb;
 	enum st_net		state_net;
 
@@ -263,7 +287,55 @@ struct server {
 	struct server_stats	stats;		/* global statistics */
 };
 
-extern struct tabledb tdb;
+/*
+ * Low-level channel, for both sides.
+ *
+ * The combined link state confuses session (e.g. login) and the framing, which
+ * is not pretty but works. At least we have a separate link-state struct.
+ *
+ * In a settled state, db_conn corresponds 1:1 to db_remote, but
+ * it's not necesserily so when connections are being established.
+ */
+enum dbc_state {  DBC_INIT, DBC_LOGIN, DBC_OPEN, DBC_DEAD };
+
+struct db_link {
+	int		fd;
+	enum dbc_state	state;
+
+	bool		writing;
+	struct event	wrev;			/* when writing */
+	unsigned char	*obuf;
+	int		obuflen;
+	int		done, togo;
+
+	struct event	rcev;			/* whenever fd >= 0 */
+	unsigned char	*ibuf;
+	int		ibuflen;		/* currently allocated ibuf */
+	int		cnt;			/* currently in ibuf */
+	int		explen;			/* expected length */
+};
+
+struct db_conn {		/* a connection with other DB node */
+	struct tablerep	*rtdb;
+	struct db_remote *remote;
+	struct list_head link;
+
+	struct db_link	lk;
+};
+
+struct tablerep {
+	struct tabledb	tdb;
+	const char	*thisname;
+	int		thisid;
+
+	int		sockfd4, sockfd6;
+	struct event	lsev4, lsev6;
+	struct list_head conns;	// struct db_conn
+
+	struct db_conn	*mdbc;
+};
+
+extern struct tablerep tdbrep;
 
 /* bucket.c */
 extern bool has_access(const char *user, const char *bucket, const char *key,
@@ -295,7 +367,8 @@ extern void cli_in_end(struct client *cli);
 
 /* cldu.c */
 extern void cld_init(void);
-extern int cld_begin(const char *fqdn, const char *group, int verbose);
+extern int cld_begin(const char *fqdn, const char *group, const char *name,
+		int verbose);
 extern void cldu_add_host(const char *host, unsigned int port);
 extern void cld_end(void);
 
@@ -332,7 +405,13 @@ extern bool cli_write_start(struct client *cli);
 extern bool cli_write_run_compl(void);
 extern int cli_req_avail(struct client *cli);
 extern void applog(int prio, const char *fmt, ...);
+extern void cld_update_cb(void);
 extern int stor_update_cb(void);
+extern int tdb_slave_login_cb(int srcid);
+extern void tdb_slave_disc_cb(void);
+extern void tdb_conn_scrub_cb(void);
+extern struct db_remote *tdb_find_remote_byname(const char *name);
+extern struct db_remote *tdb_find_remote_byid(int id);
 
 /* status.c */
 extern bool stat_evt_http_req(struct client *cli, unsigned int events);
@@ -374,4 +453,16 @@ extern void rep_start(void);
 extern void rep_stats(void);
 extern bool rep_status(struct client *cli, GList *content);
 
+/* metarep.c */
+extern int rtdb_init(struct tablerep *rtdb, const char *thishost);
+extern int rtdb_start(struct tablerep *rtdb, const char *db_home,
+	bool we_are_master,
+	struct db_remote *rep_master, unsigned short rep_port,
+	void (*cb)(enum db_event));
+extern void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+	struct db_remote *rep_master, unsigned short rep_port);
+extern void rtdb_dbc_scrub(struct tablerep *rtdb);
+extern int rtdb_restart(struct tablerep *rtdb, bool we_are_master);
+extern void rtdb_fini(struct tablerep *rtdb);
+
 #endif /* __TABLED_H__ */
diff --git a/server/tdbadm.c b/server/tdbadm.c
index 86fa4b3..4bd26cc 100644
--- a/server/tdbadm.c
+++ b/server/tdbadm.c
@@ -45,11 +45,10 @@ enum various_modes {
 static int mode_adm;
 static unsigned long invalid_lines;
 static char *tdb_dir;
-static unsigned short rep_port;
 static char *config = "/etc/tabled.conf";
-static char *ourhost;
 
 static struct tabledb tdb;
+static bool tdb_is_master;
 
 const char *argp_program_version = PACKAGE_VERSION;
 
@@ -110,7 +109,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
 {
 	struct config_context *cc = user_data;
 	struct stat statb;
-	int n;
 
 	if (!strcmp(element_name, "TDB") && cc->text) {
 		if (!tdb_dir) {
@@ -134,25 +132,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
 		cc->text = NULL;
 	}
 
-	else if (!strcmp(element_name, "ForceHost") && cc->text) {
-		free(ourhost);
-		ourhost = cc->text;
-		cc->text = NULL;
-	}
-
-	else if (!strcmp(element_name, "TDBRepPort") && cc->text) {
-		n = strtol(cc->text, NULL, 10);
-		if (n <= 0 || n >= 65536) {
-			fprintf(stderr, "warning: "
-			       "TDBRepPort '%s' invalid, ignoring", cc->text);
-			free(cc->text);
-			cc->text = NULL;
-			return;
-		}
-		rep_port = n;
-		free(cc->text);
-		cc->text = NULL;
-	}
 }
 
 static bool str_n_isspace(const char *s, size_t n)
@@ -198,8 +177,6 @@ static void read_config(void)
 
 	memset(&ctx, 0, sizeof(struct config_context));
 
-	rep_port = 8083;
-
 	if (!g_file_get_contents(config, &text, &len, NULL)) {
 		fprintf(stderr, "failed to read config file %s\n", config);
 		exit(1);
@@ -603,10 +580,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 	return 0;
 }
 
+static void tdb_state_cb(enum db_event event)
+{
+	if (event == TDB_EV_MASTER)
+		tdb_is_master = true;
+}
+
 int main(int argc, char *argv[])
 {
-	char hostname[64];
-	unsigned int env_flags, db_flags;
+	unsigned int db_flags;
 	error_t aprc;
 	int rc = 1;
 
@@ -621,21 +603,12 @@ int main(int argc, char *argv[])
 	if (!tdb_dir)
 		die("no tdb dir (-t) specified\n");
 
-	if (ourhost)
-		strcpy(hostname, ourhost);
-	else if (gethostname(hostname, sizeof(hostname)) < 0) {
-		fprintf(stderr, "gethostname failed: %s\n", strerror(errno));
-		return 1;
-	}
-
-	env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
-	if (tdb_init(&tdb, tdb_dir, NULL, env_flags,
-		     "tdbadm", false, NULL, hostname, rep_port, NULL))
+	if (tdb_init(&tdb, tdb_dir, NULL, "tdbadm", false,
+		     0, NULL, true, tdb_state_cb))
 		goto err_dbinit;
 
-	/* Usually takes about 12s */
-	/* FIXME don't peek into private parts of tdb struct, use state_cb */
-	while (!tdb.is_master)
+	/* Usually takes about 12s, if vote is involved. */
+	while (!tdb_is_master)
 		sleep(2);
 
 	db_flags = DB_CREATE | DB_THREAD;
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux