[PATCH 3/3] CLD: enable replication on server and client

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Enable master/slave replication in CLD, by db4's replication manager.

On the server side, you may input a list of peer servers via DNS SRV
records, or manually on the command line for testing.  Each peer
participates in replication elections, and functions as a slave until
the replication engine decides it is to be elected master.  Under the
hood, db4 does use PAXOS in versions >= 4.7.  All CLD network operations
are already wrapped inside db4 transactions, so this merely extends
current reliability.  The server requires a quorum of 5 peers by
default, but this is easily changeable.

For the most part, this piece of code has been around for a little
while.  It has now been updated for all recent hail changes.  The new
bits are client side work that enables replication/failover to work
smoothly and reliably.

On the client side, the main change is passing around a list of
cldc_host.  All users of the lib built this internally, and then hobbled
themselves by calling a singleton hostname/port interface.  This
limitation has been removed, properly exposing the entire list of CLD
servers to the CLD client API.

CLD clients will make TCP connections to each CLD server, in priority
order.  Connections failing will move on to the next server.  Ditto with
in-session server death; clients will move back into seeking-a-master
state.  CLD clients always seek the master for all operations; no
attempt is made to divide the load by sending clients to slave servers.

This is all moving towards the goal of having clients connect to a cell
(aka a cluster), rather than a host.  CLD users will specify

	cld://mycluster.example.com/tabled/1234/ABCD

and the client lib will perform a DNS SRV lookup on
"mycluster.example.com".  The DNS SRV lookup returns one or more hosts
and priorities, each of which are in turn looked-up with getaddrinfo(3).

As an added reliability feature, stolen from many current P2P
applications, if you reach at least one CLD server, you can download a
host list from that server.  This makes sure you can bootstrap into the
cloud regardless of how poorly your systems are running :)  Bittorrent
users are already familiar with "peer exchange."

Unlike the previous two patches, this code is hot off the presses, and
probably still has bugs (hence the "NOT-signed-off-by" below).

NOT-Signed-off-by: Jeff Garzik <jgarzik@xxxxxxxxxx>
---
 chunkd/cldu.c   |   13 +-
 cld/cld.h       |   22 +++
 cld/cldb.c      |   69 ++++++++++--
 cld/cldb.h      |    9 +
 cld/cldbadm.c   |    6 -
 cld/server.c    |  313 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
 include/cldc.h  |    3 
 include/ncld.h  |    3 
 lib/cldc-tcp.c  |   21 +++
 lib/cldc.c      |   22 ---
 tools/cldcli.c  |    4 
 tools/cldfuse.c |    5 
 12 files changed, 423 insertions(+), 67 deletions(-)

diff --git a/chunkd/cldu.c b/chunkd/cldu.c
index 41f94b5..b17643e 100644
--- a/chunkd/cldu.c
+++ b/chunkd/cldu.c
@@ -47,6 +47,8 @@ struct cld_session {
 	int			actx;		/* Active host cldv[actx] */
 	struct cld_host		cldv[N_CLD];
 
+	GList			*host_list;
+
 	int			event_pipe[2];
 	struct event		ev;
 	struct event		ev_timer;
@@ -357,7 +359,7 @@ static int cldu_set_cldc(struct cld_session *cs, int newactive)
 		applog(LOG_INFO, "Selected CLD host %s port %u",
 		       hp->host, hp->port);
 
-	cs->nsess = ncld_sess_open(hp->host, hp->port, &error,
+	cs->nsess = ncld_sess_open(cs->host_list, &error,
 				   cldu_sess_event, cs, "tabled", "tabled",
 				   &cldu_hail_log);
 	if (cs->nsess == NULL) {
@@ -505,8 +507,7 @@ int cld_begin(const char *thishost, uint32_t nid, char *infopath,
 			goto err_addr;
 		}
 
-		/* copy host_list into cld_session host array,
-		 * taking ownership of alloc'd strings along the way
+		/* copy host_list into cld_session host array
 		 */
 		i = 0;
 		for (tmp = host_list; tmp; tmp = tmp->next) {
@@ -514,15 +515,13 @@ int cld_begin(const char *thishost, uint32_t nid, char *infopath,
 			if (i < N_CLD) {
 				memcpy(&cs->cldv[i].h, hp,
 				       sizeof(struct cldc_host));
+				cs->cldv[i].h.host = strdup(hp->host);
 				cs->cldv[i].known = 1;
 				i++;
-			} else {
-				free(hp->host);
 			}
-			free(hp);
 		}
 
-		g_list_free(host_list);
+		cs->host_list = host_list;
 	}
 
 	if (pipe(cs->event_pipe) < 0) {
diff --git a/cld/cld.h b/cld/cld.h
index b1f9bbf..c5c38b5 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -44,6 +44,15 @@ enum {
 	SFL_FOREGROUND		= (1 << 0),	/* run in foreground */
 };
 
+enum st_cldb {
+	ST_CLDB_INIT,
+	ST_CLDB_OPEN,
+	ST_CLDB_ACTIVE,
+	ST_CLDB_MASTER,
+	ST_CLDB_SLAVE,
+	ST_CLDBNUM
+};
+
 struct atcp_read {
 	void			*buf;
 	unsigned int		buf_size;
@@ -142,6 +151,19 @@ struct server {
 
 	struct event		chkpt_timer;	/* db4 checkpoint timer */
 
+	unsigned short		rep_port;	/* db4 replication port */
+
+	char			*myhost;
+	char			*force_myhost;
+	GList			*rep_remotes;
+
+	unsigned int		n_peers;	/* total peers in cell */
+
+	int			ev_pipe[2];	/* internal event pipe */
+	struct event		pipe_ev;
+
+	enum st_cldb		state_cldb;	/* db & replication state */
+
 	struct server_stats	stats;		/* global statistics */
 };
 
diff --git a/cld/cldb.c b/cld/cldb.c
index f8afb61..671a4c2 100644
--- a/cld/cldb.c
+++ b/cld/cldb.c
@@ -29,8 +29,6 @@
 #include <cld-private.h>
 #include "cld.h"
 
-static int cldb_up(struct cldb *cldb, unsigned int flags);
-
 /*
  * db4 page sizes for our various databases.  Filesystem block size
  * is recommended, so 4096 was chosen (default ext3 block size).
@@ -206,6 +204,30 @@ err_out:
 	return -EIO;
 }
 
+static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
+{
+	int rc;
+	struct db_remote *rp;
+	GList *tmp;
+
+	*nsites = 0;
+	for (tmp = remotes; tmp; tmp = tmp->next) {
+		rp = tmp->data;
+
+		rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
+						   NULL, 0);
+		if (rc) {
+			dbenv->err(dbenv, rc,
+				   "dbenv->add.remote.site host %s port %u",
+				   rp->host, rp->port);
+			return rc;
+		}
+		(*nsites)++;
+	}
+
+	return 0;
+}
+
 static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 {
 	struct cldb *cldb = dbenv->app_private;
@@ -233,12 +255,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 
 int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 	      unsigned int env_flags, const char *errpfx, bool do_syslog,
-	      unsigned int flags, void (*cb)(enum db_event))
+	      GList *remotes, char *rep_host, unsigned short rep_port,
+	      int n_peers, void (*cb)(enum db_event))
 {
-	int rc;
+	int rc, nsites = 0;
 	DB_ENV *dbenv;
 
-	cldb->is_master = true;
+	cldb->is_master = false;
 	cldb->home = db_home;
 	cldb->state_cb = cb;
 
@@ -285,25 +308,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 		cldb->keyed = true;
 	}
 
+	rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->set_local_site");
+		goto err_out;
+	}
+
 	rc = dbenv->set_event_notify(dbenv, db4_event);
 	if (rc) {
 		dbenv->err(dbenv, rc, "dbenv->set_event_notify");
 		goto err_out;
 	}
 
+	rc = dbenv->rep_set_priority(dbenv, 100);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->rep_set_priority");
+		goto err_out;
+	}
+
+	rc = dbenv->rep_set_nsites(dbenv, n_peers);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->rep_set_nsites");
+		goto err_out;
+	}
+
+	rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->rep_ack_policy");
+		goto err_out;
+	}
+
 	/* init DB transactional environment, stored in directory db_home */
 	env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
-	env_flags |= DB_INIT_TXN;
+	env_flags |= DB_INIT_TXN | DB_INIT_REP;
 	rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
 	if (rc) {
 		dbenv->err(dbenv, rc, "dbenv->open");
 		goto err_out;
 	}
 
-	rc = cldb_up(cldb, flags);
+	rc = add_remote_sites(dbenv, remotes, &nsites);
 	if (rc)
 		goto err_out;
 
+	rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->repmgr_start");
+		goto err_out;
+	}
+
 	return 0;
 
 err_out:
@@ -314,7 +367,7 @@ err_out:
 /*
  * open databases
  */
-static int cldb_up(struct cldb *cldb, unsigned int flags)
+int cldb_up(struct cldb *cldb, unsigned int flags)
 {
 	DB_ENV *dbenv = cldb->env;
 	int rc;
diff --git a/cld/cldb.h b/cld/cldb.h
index 0d8e618..0fec486 100644
--- a/cld/cldb.h
+++ b/cld/cldb.h
@@ -104,6 +104,11 @@ enum db_event {
 	CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED
 };
 
+struct db_remote {	/* remotes for cldb_init */
+	char		*host;
+	unsigned short	port;
+};
+
 struct cldb {
 	bool		is_master;
 	bool		keyed;			/* using encryption? */
@@ -130,7 +135,9 @@ struct cldb {
 
 extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 	      unsigned int env_flags, const char *errpfx, bool do_syslog,
-	      unsigned int flags, void (*cb)(enum db_event));
+	      GList *remotes, char *rep_host, unsigned short rep_port,
+	      int n_peers, void (*cb)(enum db_event));
+extern int cldb_up(struct cldb *cldb, unsigned int flags);
 extern void cldb_down(struct cldb *cldb);
 extern void cldb_fini(struct cldb *cldb);
 
diff --git a/cld/cldbadm.c b/cld/cldbadm.c
index 7176f3b..1c0f26a 100644
--- a/cld/cldbadm.c
+++ b/cld/cldbadm.c
@@ -78,7 +78,8 @@ int main(int argc, char *argv[])
 	}
 
 	if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL,
-		      DB_RECOVER, "cldbadm", false, 0, NULL))
+		      DB_RECOVER, "cldbadm", false,
+		      NULL, NULL, 0, 0, NULL))
 		goto err_dbopen;
 
 	switch (cld_adm.mode) {
@@ -143,7 +144,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
  */
 int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 	      unsigned int env_flags, const char *errpfx, bool do_syslog,
-	      unsigned int flags, void (*cb)(enum db_event))
+	      GList *remotes, char *rep_host, unsigned short rep_port,
+	      int n_peers, void (*cb)(enum db_event))
 {
 
 	return 0;
diff --git a/cld/server.c b/cld/server.c
index 5a73e54..f248f92 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -45,6 +45,21 @@
 #define CLD_DEF_PIDFN	CLD_LOCAL_STATE_DIR "/run/cld.pid"
 #define CLD_DEF_DATADIR	CLD_LIBDIR "/cld/lib"
 
+enum {
+	CLD_DEF_REP_PORT	= 9081,
+
+	CLD_DEF_PEERS		= 5,
+	CLD_MIN_PEERS		= 3,
+	CLD_MAX_PEERS		= 400,		/* arbitrary "sanity" limit */
+};
+
+enum int_event_cmd {
+	IEC_NONE,		/* invalid / no command */
+	IEC_DUMP,		/* statistics dump */
+	IEC_NOW_MASTER,		/* replication state -> master */
+	IEC_NOW_SLAVE,		/* replication state -> slave */
+};
+
 const char *argp_program_version = PACKAGE_VERSION;
 
 static struct argp_option options[] = {
@@ -57,10 +72,18 @@ static struct argp_option options[] = {
 	  "Switch the log to standard error" },
 	{ "foreground", 'F', NULL, 0,
 	  "Run in foreground, do not fork" },
+	{ "myhost", 'm', "HOST", 0,
+	  "Force local hostname to HOST (def: autodetect)" },
 	{ "port", 'p', "PORT", 0,
 	  "Bind to TCP port PORT.  Default: " CLD_DEF_PORT },
 	{ "pid", 'P', "FILE", 0,
 	  "Write daemon process id to FILE.  Default: " CLD_DEF_PIDFN },
+	{ "rep-port", 'r', "PORT", 0,
+	  "bind replication engine to port PORT (def: 9081)" },
+	{ "remote", 'R', "HOST:PORT", 0,
+	  "Add a HOST:PORT pair to list of remote hosts.  Use this argument multiple times to build cell's peer list." },
+	{ "cell-size", 'S', "PEERS", 0,
+	  "Total number of PEERS in cell. (PEERS/2)+1 required for quorum.  Must be an odd number (def: 5)" },
 	{ "strict-free", 1001, NULL, 0,
 	  "For memory-checker runs.  When shutting down server, free local "
 	  "heap, rather than simply exit(2)ing and letting OS clean up." },
@@ -78,17 +101,25 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state);
 static const struct argp argp = { options, parse_opt, NULL, doc };
 
 static bool server_running = true;
-static bool dump_stats;
 static bool use_syslog = true;
 static bool strict_free = false;
 struct timeval current_time;
 
+static const char *state_name_cldb[ST_CLDBNUM] = {
+	"Init", "Open", "Active", "Master", "Slave"
+};
+
 struct server cld_srv = {
 	.data_dir		= CLD_DEF_DATADIR,
 	.pid_file		= CLD_DEF_PIDFN,
 	.port			= CLD_DEF_PORT,
+	.rep_port		= CLD_DEF_REP_PORT,
+	.n_peers		= CLD_DEF_PEERS,
+	.state_cldb		= ST_CLDB_INIT,
 };
 
+static void cldb_state_process(enum st_cldb new_state);
+static void stats_dump(void);
 static void ensure_root(void);
 static bool atcp_read(struct atcp_read_state *rst,
 		      void *buf, unsigned int buf_size,
@@ -117,6 +148,33 @@ static void applog(int prio, const char *fmt, ...)
 	va_end(ap);
 }
 
+/*
+ * Find out own hostname.
+ * This is needed for:
+ *  - finding the local domain and its SRV records
+ * Do this before our state machines start ticking, so we can quit with
+ * a meaningful message easily.
+ */
+static char *get_hostname(void)
+{
+	enum { hostsz = 64 };
+	char hostb[hostsz];
+	char *ret;
+
+	if (gethostname(hostb, hostsz-1) < 0) {
+		HAIL_ERR(&srv_log, "get_hostname: gethostname error (%d): %s",
+			 errno, strerror(errno));
+		exit(1);
+	}
+	hostb[hostsz-1] = 0;
+	if ((ret = strdup(hostb)) == NULL) {
+		HAIL_ERR(&srv_log, "get_hostname: no core (%ld)",
+			 (long)strlen(hostb));
+		exit(1);
+	}
+	return ret;
+}
+
 struct hail_log srv_log = {
 	.func = applog,
 };
@@ -863,6 +921,119 @@ static void net_close(void)
 	}
 }
 
+static void cldb_state_cb(enum db_event event)
+{
+
+	switch (event) {
+	case CLDB_EV_ELECTED:
+		/*
+		 * Safe to stop ignoring bogus client indication,
+		 * so unmute us by advancing the state.
+		 */
+		if (cld_srv.state_cldb == ST_CLDB_OPEN)
+			cld_srv.state_cldb = ST_CLDB_ACTIVE;
+		break;
+	case CLDB_EV_CLIENT:
+	case CLDB_EV_MASTER:
+		/*
+		 * This callback runs on the context of the replication
+		 * manager thread, and calling any of our functions thus
+		 * turns our program into a multi-threaded one. Instead
+		 * we do a loopbreak and postpone the processing.
+		 */
+		if (cld_srv.state_cldb != ST_CLDB_INIT &&
+		    cld_srv.state_cldb != ST_CLDB_OPEN) {
+			unsigned char cmd;
+
+			if (event == CLDB_EV_MASTER)
+				cmd = IEC_NOW_MASTER;
+			else
+				cmd = IEC_NOW_SLAVE;
+
+			/* wake up main loop */
+			write(cld_srv.ev_pipe[1], &cmd, 1);
+		}
+		break;
+	default:
+		HAIL_WARN(&srv_log, "API confusion with CLDB, event 0x%x",
+			  event);
+		cld_srv.state_cldb = ST_CLDB_OPEN;  /* wrong, stub for now */
+		break;
+	}
+}
+
+static void internal_event(int fd, short events, void *userdata)
+{
+	unsigned char cmd;
+	ssize_t rrc;
+
+	rrc = read(cld_srv.ev_pipe[0], &cmd, 1);
+	if (rrc < 0) {
+		HAIL_WARN(&srv_log, "pipe read error: %s", strerror(errno));
+		abort();
+	}
+	if (rrc < 1) {
+		HAIL_WARN(&srv_log, "pipe short read");
+		abort();
+	}
+
+	switch(cmd) {
+	case IEC_DUMP:
+		stats_dump();
+		break;
+	case IEC_NOW_MASTER:
+		if (cld_srv.state_cldb == ST_CLDB_MASTER)
+			break;
+
+		cldb_state_process(ST_CLDB_MASTER);
+		cld_srv.state_cldb = ST_CLDB_MASTER;
+
+		HAIL_DEBUG(&srv_log, "CLDB state > %s",
+			   state_name_cldb[cld_srv.state_cldb]);
+		break;
+	case IEC_NOW_SLAVE:
+		if (cld_srv.state_cldb == ST_CLDB_SLAVE)
+			break;
+
+		cldb_state_process(ST_CLDB_SLAVE);
+		cld_srv.state_cldb = ST_CLDB_SLAVE;
+
+		HAIL_DEBUG(&srv_log, "CLDB state > %s",
+			   state_name_cldb[cld_srv.state_cldb]);
+		break;
+	default:
+		HAIL_WARN(&srv_log, "%s BUG: command 0x%x", __func__, cmd);
+		break;
+	}
+}
+
+static void cldb_state_process(enum st_cldb new_state)
+{
+	unsigned int db_flags;
+
+	if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) &&
+	    cld_srv.state_cldb == ST_CLDB_ACTIVE) {
+
+		db_flags = DB_CREATE | DB_THREAD;
+		if (cldb_up(&cld_srv.cldb, db_flags))
+			return;
+
+		ensure_root();
+
+		if (sess_load(cld_srv.sessions) != 0) {
+			HAIL_ERR(&srv_log, "session load failed. "
+			       "FIXME: I want error handling");
+			return;
+		}
+
+		add_chkpt_timer();
+	} else {
+		if (srv_log.verbose)
+		      HAIL_DEBUG(&srv_log,"unhandled state transition %d -> %d",
+			     cld_srv.state_cldb, new_state);
+      }
+}
+
 static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 			   int addr_len, void *addr_ptr)
 {
@@ -1067,8 +1238,8 @@ static void term_signal(int signo)
 
 static void stats_signal(int signo)
 {
-	dump_stats = true;
-	event_loopbreak();
+	static const unsigned char cmd = IEC_DUMP;
+	write(cld_srv.ev_pipe[1], &cmd, 1);
 }
 
 #define X(stat) \
@@ -1083,6 +1254,53 @@ static void stats_dump(void)
 
 #undef X
 
+static bool add_remote(const char *arg)
+{
+	size_t arg_len = strlen(arg);
+	int i, port;
+	struct db_remote *rp;
+	char *s_port, *colon;
+
+	if (!arg_len)
+		return false;
+
+	/* verify no whitespace in input */
+	for (i = 0; i < arg_len; i++)
+		if (isspace(arg[i]))
+			return false;
+
+	/* find colon delimiter */
+	colon = strchr(arg, ':');
+	if (!colon || (colon == arg))
+		return false;
+	s_port = colon + 1;
+
+	/* parse replication port number */
+	port = atoi(s_port);
+	if (port < 1 || port > 65535)
+		return false;
+
+	/* alloc and fill in remote-host record */
+	rp = malloc(sizeof(*rp));
+	if (!rp)
+		return false;
+	
+	rp->port = port;
+	rp->host = strdup(arg);
+	if (!rp->host) {
+		free(rp);
+		return false;
+	}
+
+	/* truncate string down to simply hostname portion */
+	rp->host[colon - arg] = 0;
+
+	/* add remote host to global list */
+	cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp);
+
+	return true;
+}
+
 static error_t parse_opt (int key, char *arg, struct argp_state *state)
 {
 	int v;
@@ -1108,6 +1326,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 	case 'F':
 		cld_srv.flags |= SFL_FOREGROUND;
 		break;
+	case 'm':
+		if ((strlen(arg) > 3) && (strlen(arg) < 64) &&
+		    (strchr(arg, '.')))
+			cld_srv.force_myhost = arg;
+		else {
+			fprintf(stderr, "invalid myhost: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
 	case 'p':
 		/*
 		 * We do not permit "0" as an argument in order to be safer
@@ -1126,6 +1353,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 	case 'P':
 		cld_srv.pid_file = arg;
 		break;
+	case 'r':
+		if (atoi(arg) > 0 && atoi(arg) < 65536)
+			cld_srv.rep_port = atoi(arg);
+		else {
+			fprintf(stderr, "invalid rep-port: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
+	case 'R':
+		if (!add_remote(arg)) {
+			fprintf(stderr, "invalid remote host:port: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
+	case 'S': {
+		int n_peers = atoi(arg);
+		if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) &&
+		    (n_peers & 0x01))
+			cld_srv.n_peers = atoi(arg);
+		else {
+			fprintf(stderr, "invalid peer count: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
+		}
 
 	case 1001:			/* --strict-free */
 		strict_free = true;
@@ -1153,11 +1405,6 @@ static int main_loop(void)
 		event_dispatch();
 
 		gettimeofday(&current_time, NULL);
-
-		if (dump_stats) {
-			dump_stats = false;
-			stats_dump();
-		}
 	}
 
 	return 0;
@@ -1166,7 +1413,7 @@ static int main_loop(void)
 int main (int argc, char *argv[])
 {
 	error_t aprc;
-	int rc = 1;
+	int rc = 1, env_flags;
 
 	INIT_LIST_HEAD(&cld_srv.sockets);
 
@@ -1195,6 +1442,20 @@ int main (int argc, char *argv[])
 
 	cld_srv.evbase_main = event_init();
 
+	if (cld_srv.force_myhost)
+		cld_srv.myhost = strdup(cld_srv.force_myhost);
+	else
+		cld_srv.myhost = get_hostname();
+
+	if (srv_log.verbose)
+		HAIL_DEBUG(&srv_log, "our hostname: %s", cld_srv.myhost);
+
+	/* remotes file should list all in peer group, except for us */
+	if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) {
+		HAIL_ERR(&srv_log, "n_peers does not match remotes file loaded");
+		goto err_out;
+	}
+
 	if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
 		syslogerr("daemon");
 		goto err_out;
@@ -1215,16 +1476,7 @@ int main (int argc, char *argv[])
 	signal(SIGTERM, term_signal);
 	signal(SIGUSR1, stats_signal);
 
-	if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
-		      DB_CREATE | DB_THREAD | DB_RECOVER,
-		      "cld", use_syslog,
-		      DB_CREATE | DB_THREAD, NULL))
-		exit(1);
-
-	ensure_root();
-
 	evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
-	add_chkpt_timer();
 
 	rc = 1;
 
@@ -1232,18 +1484,38 @@ int main (int argc, char *argv[])
 	if (!cld_srv.sessions)
 		goto err_out_pid;
 
-	if (sess_load(cld_srv.sessions) != 0)
+	if (pipe(cld_srv.ev_pipe) < 0) {
+		syslogerr("pipe");
 		goto err_out_pid;
+	}
 
 	/* set up server networking */
 	rc = net_open();
 	if (rc)
 		goto err_out_pid;
 
+	event_set(&cld_srv.pipe_ev, cld_srv.ev_pipe[0], EV_READ | EV_PERSIST,
+		  internal_event, NULL);
+	if (event_add(&cld_srv.pipe_ev, NULL) < 0)
+		goto err_out_pid;
+
+	env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+	if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
+		    env_flags, "cld", true,
+		    cld_srv.rep_remotes,
+		    cld_srv.myhost, cld_srv.rep_port,
+		    cld_srv.n_peers, cldb_state_cb)) {
+		HAIL_ERR(&srv_log, "Failed to open CLDB, limping");
+	} else
+		cld_srv.state_cldb = ST_CLDB_OPEN;
+
 	HAIL_INFO(&srv_log, "initialized: %s%s%s",
 		  srv_log.debug ? "debug" : "nodebug",
 		  srv_log.verbose ? ", verbose" : "",
 		  strict_free ? ", strict-free" : "");
+	HAIL_INFO(&srv_log, "replication: %s:%u",
+		cld_srv.myhost,
+		cld_srv.rep_port);
 
 	/*
 	 * execute main loop
@@ -1258,7 +1530,8 @@ int main (int argc, char *argv[])
 
 	if (cld_srv.cldb.up)
 		cldb_down(&cld_srv.cldb);
-	cldb_fini(&cld_srv.cldb);
+	if (cld_srv.state_cldb >= ST_CLDB_OPEN)
+		cldb_fini(&cld_srv.cldb);
 
 err_out_pid:
 	unlink(cld_srv.pid_file);
diff --git a/include/cldc.h b/include/cldc.h
index f98d151..c4f0a64 100644
--- a/include/cldc.h
+++ b/include/cldc.h
@@ -225,8 +225,7 @@ extern void cldc_copts_get_metadata(const struct cldc_call_opts *copts,
 
 /* cldc-tcp */
 extern void cldc_tcp_free(struct cldc_tcp *tcp);
-extern int cldc_tcp_new(const char *hostname, int port,
-		 struct cldc_tcp **tcp_out);
+extern int cldc_tcp_new(GList *host_list, struct cldc_tcp **tcp_out);
 extern int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp);
 extern int cldc_tcp_pkt_send(void *private,
 			  const void *addr, size_t addrlen,
diff --git a/include/ncld.h b/include/ncld.h
index 21b6e36..e46a4e7 100644
--- a/include/ncld.h
+++ b/include/ncld.h
@@ -30,7 +30,6 @@
 #include <cldc.h>
 
 struct ncld_sess {
-	char			*host;
 	unsigned short		port;
 	GMutex			*mutex;
 	GCond			*cond;
@@ -71,7 +70,7 @@ struct ncld_read {
 	int		errc;
 };
 
-extern struct ncld_sess *ncld_sess_open(const char *host, int port,
+extern struct ncld_sess *ncld_sess_open(GList *host_list,
 	int *error, void (*event)(void *, unsigned int), void *ev_arg,
 	const char *cld_user, const char *cld_key, struct hail_log *log);
 extern struct ncld_fh *ncld_open(struct ncld_sess *s, const char *fname,
diff --git a/lib/cldc-tcp.c b/lib/cldc-tcp.c
index 63a753b..96d78e2 100644
--- a/lib/cldc-tcp.c
+++ b/lib/cldc-tcp.c
@@ -42,7 +42,7 @@ void cldc_tcp_free(struct cldc_tcp *tcp)
 	free(tcp);
 }
 
-int cldc_tcp_new(const char *hostname, int port,
+static int cldc_tcp_new_cxn(const char *hostname, int port,
 		 struct cldc_tcp **tcp_out)
 {
 	struct cldc_tcp *tcp;
@@ -99,6 +99,25 @@ int cldc_tcp_new(const char *hostname, int port,
 	return 0;
 }
 
+int cldc_tcp_new(GList *host_list, struct cldc_tcp **tcp_out)
+{
+	struct cldc_host *hp;
+	GList *tmp;
+	int rc = -ENOENT;
+
+	tmp = host_list;
+	while (tmp) {
+		hp = tmp->data;
+		tmp = tmp->next;
+
+		rc = cldc_tcp_new_cxn(hp->host, hp->port, tcp_out);
+		if (rc == 0)
+			break;
+	}
+
+	return rc;
+}
+
 int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp)
 {
 	static char buf[CLD_RAW_MSG_SZ];	/* BUG: static buf */
diff --git a/lib/cldc.c b/lib/cldc.c
index f0c3b59..8d99bf2 100644
--- a/lib/cldc.c
+++ b/lib/cldc.c
@@ -1326,6 +1326,7 @@ char *cldc_dirent_name(struct cld_dirent_cur *dc)
 	return s;
 }
 
+#if 0
 /*
  * On error, return the code (not negated code like a kernel function would).
  */
@@ -1374,6 +1375,7 @@ static int ncld_gethost(char **hostp, unsigned short *portp,
 
 	return 0;
 }
+#endif /* 0 */
 
 static void ncld_tcp_timer_event(struct cld_timer *timer)
 {
@@ -1600,8 +1602,7 @@ static int ncld_wait_session(struct ncld_sess *nsess)
  * this function returns, with the session going down, for example.
  * This is kind of dirty, but oh well. Maybe we'll fix this later.
  *
- * @param host Host name (NULL if resolving SRV records)
- * @param port Port
+ * @param host_list Host list
  * @param error Buffer for the error code
  * @param ev_func Session event function (ok to be NULL)
  * @param ev_arg User-supplied argument to the session event function
@@ -1609,7 +1610,7 @@ static int ncld_wait_session(struct ncld_sess *nsess)
  * @param cld_key The user key to be used to authentication
  * @param log The application log descriptor (ok to be NULL)
  */
-struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
+struct ncld_sess *ncld_sess_open(GList *host_list, int *error,
 				 void (*ev_func)(void *, unsigned int),
 				 void *ev_arg,
 				 const char *cld_user, const char *cld_key,
@@ -1643,16 +1644,6 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 	if (!nsess->cond)
 		goto out_cond;
 
-	if (!host) {
-		err = ncld_getsrv(&nsess->host, &nsess->port, log);
-		if (err)
-			goto out_srv;
-	} else {
-		err = ncld_gethost(&nsess->host, &nsess->port, host, port);
-		if (err)
-			goto out_srv;
-	}
-
 	nsess->event = ev_func;
 	nsess->event_arg = ev_arg;
 
@@ -1661,7 +1652,7 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 		goto out_pipe_to;
 	}
 
-	if (cldc_tcp_new(nsess->host, nsess->port, &nsess->tcp)) {
+	if (cldc_tcp_new(host_list, &nsess->tcp)) {
 		err = 1023;
 		goto out_tcp;
 	}
@@ -1704,8 +1695,6 @@ out_tcp:
 	close(nsess->to_thread[0]);
 	close(nsess->to_thread[1]);
 out_pipe_to:
-	free(nsess->host);
-out_srv:
 	g_cond_free(nsess->cond);
 out_cond:
 	g_mutex_free(nsess->mutex);
@@ -2273,7 +2262,6 @@ void ncld_sess_close(struct ncld_sess *nsess)
 	close(nsess->to_thread[1]);
 	g_cond_free(nsess->cond);
 	g_mutex_free(nsess->mutex);
-	free(nsess->host);
 	free(nsess);
 }
 
diff --git a/tools/cldcli.c b/tools/cldcli.c
index d347bf3..5321986 100644
--- a/tools/cldcli.c
+++ b/tools/cldcli.c
@@ -682,7 +682,6 @@ static void prompt(void)
 int main (int argc, char *argv[])
 {
 	char linebuf[CLD_PATH_MAX + 1];
-	struct cldc_host *dr;
 	error_t aprc;
 	int error;
 
@@ -720,9 +719,8 @@ int main (int argc, char *argv[])
 
 	printf("Waiting for session startup...\n");
 	fflush(stdout);
-	dr = host_list->data;
 
-	nsess = ncld_sess_open(dr->host, dr->port, &error, sess_event, NULL,
+	nsess = ncld_sess_open(host_list, &error, sess_event, NULL,
 			     our_user, our_user, &cli_log);
 	if (!nsess) {
 		if (error < 1000) {
diff --git a/tools/cldfuse.c b/tools/cldfuse.c
index 2cba917..395f4a3 100644
--- a/tools/cldfuse.c
+++ b/tools/cldfuse.c
@@ -348,7 +348,6 @@ static int cldfuse_process_arg(void *data, const char *arg, int key,
 int main(int argc, char *argv[])
 {
 	struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
-	struct cldc_host *dr;
 	int error;
 
 	if (fuse_opt_parse(&args, NULL, cldfuse_opts, cldfuse_process_arg)) {
@@ -372,9 +371,7 @@ int main(int argc, char *argv[])
 		}
 	}
 
-	dr = param.host_list->data;
-
-	sess = ncld_sess_open(dr->host, dr->port, &error, sess_event, NULL,
+	sess = ncld_sess_open(param.host_list, &error, sess_event, NULL,
 			     "cldfuse", "cldfuse", &cldfuse_log);
 	if (!sess) {
 		if (error < 1000) {
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux