I just updated the current CLD replication code to the latest upstream cld.git code. It is now living on the "replica" branch of git://git.kernel.org/pub/scm/daemon/cld/cld.git server/cld.h | 20 ++++ server/cldb.c | 69 +++++++++++-- server/cldb.h | 9 ++- server/cldbadm.c | 8 +- server/server.c | 287 ++++++++++++++++++++++++++++++++++++++++++++++++++--- test/pid-exists | 24 +++-- test/prep-db | 19 ++-- test/start-daemon | 24 ++++- test/stop-daemon | 18 ++-- 9 files changed, 419 insertions(+), 59 deletions(-) Jeff Garzik (1): cld replication work-in-progress. diff --git a/server/cld.h b/server/cld.h index da26862..84d2348 100644 --- a/server/cld.h +++ b/server/cld.h @@ -94,6 +94,15 @@ struct msg_params { size_t msg_len; }; +enum st_cldb { + ST_CLDB_INIT, + ST_CLDB_OPEN, + ST_CLDB_ACTIVE, + ST_CLDB_MASTER, + ST_CLDB_SLAVE, + ST_CLDBNUM +}; + struct server_stats { unsigned long poll; /* num. polls */ unsigned long event; /* events dispatched */ @@ -115,6 +124,17 @@ struct server { char *port; /* bind port, NULL means auto */ char *port_file; /* Port file to write */ + unsigned short rep_port; /* db4 replication port */ + + char *myhost; + char *force_myhost; + GList *rep_remotes; + + unsigned int n_peers; /* total peers in cell */ + + int rep_pipe[2]; + + enum st_cldb state_cldb, state_cldb_new; struct cldb cldb; /* database info */ diff --git a/server/cldb.c b/server/cldb.c index 2f012e5..3f080e1 100644 --- a/server/cldb.c +++ b/server/cldb.c @@ -26,8 +26,6 @@ #include <cld-private.h> #include "cld.h" -static int cldb_up(struct cldb *cldb, unsigned int flags); - /* * db4 page sizes for our various databases. Filesystem block size * is recommended, so 4096 was chosen (default ext3 block size). @@ -203,6 +201,30 @@ err_out: return -EIO; } +static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites) +{ + int rc; + struct db_remote *rp; + GList *tmp; + + *nsites = 0; + for (tmp = remotes; tmp; tmp = tmp->next) { + rp = tmp->data; + + rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port, + NULL, 0); + if (rc) { + dbenv->err(dbenv, rc, + "dbenv->add.remote.site host %s port %u", + rp->host, rp->port); + return rc; + } + (*nsites)++; + } + + return 0; +} + static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info) { struct cldb *cldb = dbenv->app_private; @@ -230,12 +252,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info) int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, unsigned int env_flags, const char *errpfx, bool do_syslog, - unsigned int flags, void (*cb)(enum db_event)) + GList *remotes, char *rep_host, unsigned short rep_port, + int n_peers, void (*cb)(enum db_event)) { - int rc; + int rc, nsites = 0; DB_ENV *dbenv; - cldb->is_master = true; + cldb->is_master = false; cldb->home = db_home; cldb->state_cb = cb; @@ -282,25 +305,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, cldb->keyed = true; } + rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->set_local_site"); + goto err_out; + } + rc = dbenv->set_event_notify(dbenv, db4_event); if (rc) { dbenv->err(dbenv, rc, "dbenv->set_event_notify"); goto err_out; } + rc = dbenv->rep_set_priority(dbenv, 100); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->rep_set_priority"); + goto err_out; + } + + rc = dbenv->rep_set_nsites(dbenv, n_peers); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->rep_set_nsites"); + goto err_out; + } + + rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->rep_ack_policy"); + goto err_out; + } + /* init DB transactional environment, stored in directory db_home */ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL; - env_flags |= DB_INIT_TXN; + env_flags |= DB_INIT_TXN | DB_INIT_REP; rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR); if (rc) { dbenv->err(dbenv, rc, "dbenv->open"); goto err_out; } - rc = cldb_up(cldb, flags); + rc = add_remote_sites(dbenv, remotes, &nsites); if (rc) goto err_out; + rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION); + if (rc) { + dbenv->err(dbenv, rc, "dbenv->repmgr_start"); + goto err_out; + } + return 0; err_out: @@ -311,7 +364,7 @@ err_out: /* * open databases */ -static int cldb_up(struct cldb *cldb, unsigned int flags) +int cldb_up(struct cldb *cldb, unsigned int flags) { DB_ENV *dbenv = cldb->env; int rc; diff --git a/server/cldb.h b/server/cldb.h index ac108ce..dbe9ea0 100644 --- a/server/cldb.h +++ b/server/cldb.h @@ -108,6 +108,11 @@ enum db_event { CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED }; +struct db_remote { /* remotes for cldb_init */ + char *host; + unsigned short port; +}; + struct cldb { bool is_master; bool keyed; /* using encryption? */ @@ -134,7 +139,9 @@ struct cldb { extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, unsigned int env_flags, const char *errpfx, bool do_syslog, - unsigned int flags, void (*cb)(enum db_event)); + GList *remotes, char *rep_host, unsigned short rep_port, + int n_peers, void (*cb)(enum db_event)); +extern int cldb_up(struct cldb *cldb, unsigned int flags); extern void cldb_down(struct cldb *cldb); extern void cldb_fini(struct cldb *cldb); diff --git a/server/cldbadm.c b/server/cldbadm.c index 37e8e36..9342f66 100644 --- a/server/cldbadm.c +++ b/server/cldbadm.c @@ -78,7 +78,8 @@ int main(int argc, char *argv[]) } if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL, - DB_RECOVER, "cldbadm", false, 0, NULL)) + DB_RECOVER, "cldbadm", false, + NULL, NULL, 0, 0, NULL)) goto err_dbopen; switch (cld_adm.mode) { @@ -142,8 +143,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) * Stubs for contents of cldb.c */ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password, - unsigned int env_flags, const char *errpfx, bool do_syslog, - unsigned int flags, void (*cb)(enum db_event)) + unsigned int env_flags, const char *errpfx, bool do_syslog, + GList *remotes, char *rep_host, unsigned short rep_port, + int n_peers, void (*cb)(enum db_event)) { return 0; diff --git a/server/server.c b/server/server.c index 39d1a54..92c91cb 100644 --- a/server/server.c +++ b/server/server.c @@ -29,6 +29,7 @@ #include <errno.h> #include <syslog.h> #include <locale.h> +#include <ctype.h> #include <argp.h> #include <netdb.h> #include <signal.h> @@ -48,6 +49,12 @@ const char *argp_program_version = PACKAGE_VERSION; enum { CLD_RAW_MSG_SZ = 4096, + + CLD_DEF_REP_PORT = 9081, + + CLD_DEF_PEERS = 5, + CLD_MIN_PEERS = 3, + CLD_MAX_PEERS = 400, /* arbitrary "sanity" limit */ }; static struct argp_option options[] = { @@ -60,10 +67,18 @@ static struct argp_option options[] = { "Switch the log to standard error" }, { "foreground", 'F', NULL, 0, "Run in foreground, do not fork" }, + { "myhost", 'm', "HOST", 0, + "Force local hostname to HOST (def: autodetect)" }, { "port", 'p', "PORT", 0, "Bind to UDP port PORT. Default: " CLD_DEF_PORT }, { "pid", 'P', "FILE", 0, "Write daemon process id to FILE. Default: " CLD_DEF_PIDFN }, + { "rep-port", 'r', "PORT", 0, + "bind replication engine to port PORT (def: 9081)" }, + { "remote", 'R', "HOST:PORT", 0, + "Add a HOST:PORT pair to list of remote hosts. Use this argument multiple times to build cell's peer list." }, + { "cell-size", 'S', "PEERS", 0, + "Total number of PEERS in cell. (PEERS/2)+1 required for quorum. Must be an odd number (def: 5)" }, { "strict-free", 1001, NULL, 0, "For memory-checker runs. When shutting down server, free local " "heap, rather than simply exit(2)ing and letting OS clean up." }, @@ -87,10 +102,15 @@ static bool strict_free = false; int debugging = 0; struct timeval current_time; +static const char *state_name_cldb[ST_CLDBNUM] = { + "Init", "Open", "Active", "Master", "Slave" +}; struct server cld_srv = { - .data_dir = CLD_DEF_DATADIR, - .pid_file = CLD_DEF_PIDFN, + .data_dir = "/spare/tmp/cld/lib", + .pid_file = "/var/run/cld.pid", .port = CLD_DEF_PORT, + .rep_port = CLD_DEF_REP_PORT, + .n_peers = CLD_DEF_PEERS, }; static void ensure_root(void); @@ -116,6 +136,33 @@ void applog(int prio, const char *fmt, ...) va_end(ap); } +/* + * Find out own hostname. + * This is needed for: + * - finding the local domain and its SRV records + * Do this before our state machines start ticking, so we can quit with + * a meaningful message easily. + */ +static char *get_hostname(void) +{ + enum { hostsz = 64 }; + char hostb[hostsz]; + char *ret; + + if (gethostname(hostb, hostsz-1) < 0) { + applog(LOG_ERR, "get_hostname: gethostname error (%d): %s", + errno, strerror(errno)); + exit(1); + } + hostb[hostsz-1] = 0; + if ((ret = strdup(hostb)) == NULL) { + applog(LOG_ERR, "get_hostname: no core (%ld)", + (long)strlen(hostb)); + exit(1); + } + return ret; +} + int udp_tx(int sock_fd, struct sockaddr *addr, socklen_t addr_len, const void *data, size_t data_len) { @@ -623,6 +670,55 @@ static void net_close(void) } } +static void cldb_state_cb(enum db_event event) +{ + + switch (event) { + case CLDB_EV_ELECTED: + /* + * Safe to stop ignoring bogus client indication, + * so unmute us by advancing the state. + */ + if (cld_srv.state_cldb == ST_CLDB_OPEN) + cld_srv.state_cldb = ST_CLDB_ACTIVE; + break; + case CLDB_EV_CLIENT: + case CLDB_EV_MASTER: + /* + * This callback runs on the context of the replication + * manager thread, and calling any of our functions thus + * turns our program into a multi-threaded one. Instead + * we do a loopbreak and postpone the processing. + */ + if (cld_srv.state_cldb != ST_CLDB_INIT && + cld_srv.state_cldb != ST_CLDB_OPEN) { + char c = 0x42; + + if (event == CLDB_EV_MASTER) + cld_srv.state_cldb_new = ST_CLDB_MASTER; + else + cld_srv.state_cldb_new = ST_CLDB_SLAVE; + if (debugging) { + applog(LOG_DEBUG, "CLDB state > %s", + state_name_cldb[cld_srv.state_cldb_new]); + } + + /* wake up main loop */ + write(cld_srv.rep_pipe[1], &c, 1); + } + break; + default: + applog(LOG_WARNING, "API confusion with CLDB, event 0x%x", event); + cld_srv.state_cldb = ST_CLDB_OPEN; /* wrong, stub for now */ + cld_srv.state_cldb_new = ST_CLDB_INIT; + } +} + +static bool noop_event(int fd, short events, void *userdata) +{ + return true; /* continue main loop; do NOT terminate server */ +} + static int net_open_socket(int addr_fam, int sock_type, int sock_prot, int addr_len, void *addr_ptr) { @@ -805,6 +901,33 @@ static int net_open(void) return net_open_known(cld_srv.port); } +static void cldb_state_process(enum st_cldb new_state) +{ + unsigned int db_flags; + + if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) && + cld_srv.state_cldb == ST_CLDB_ACTIVE) { + + db_flags = DB_CREATE | DB_THREAD; + if (cldb_up(&cld_srv.cldb, db_flags)) + return; + + ensure_root(); + + if (sess_load(cld_srv.sessions) != 0) { + applog(LOG_ERR, "session load failed. " + "FIXME: I want error handling"); + return; + } + + add_chkpt_timer(); + } else { + if (debugging) + applog(LOG_DEBUG, "unhandled state transition %d -> %d", + cld_srv.state_cldb, new_state); + } +} + static void segv_signal(int signo) { applog(LOG_ERR, "SIGSEGV"); @@ -829,10 +952,60 @@ static void stats_dump(void) X(poll); X(event); X(garbage); + + applog(LOG_INFO, "State: CLDB %s", + state_name_cldb[cld_srv.state_cldb]); } #undef X +static bool add_remote(const char *arg) +{ + size_t arg_len = strlen(arg); + int i, port; + struct db_remote *rp; + char *s_port, *colon; + + if (!arg_len) + return false; + + /* verify no whitespace in input */ + for (i = 0; i < arg_len; i++) + if (isspace(arg[i])) + return false; + + /* find colon delimiter */ + colon = strchr(arg, ':'); + if (!colon || (colon == arg)) + return false; + s_port = colon + 1; + + /* parse replication port number */ + port = atoi(s_port); + if (port < 1 || port > 65535) + return false; + + /* alloc and fill in remote-host record */ + rp = malloc(sizeof(*rp)); + if (!rp) + return false; + + rp->port = port; + rp->host = strdup(arg); + if (!rp->host) { + free(rp); + return false; + } + + /* truncate string down to simply hostname portion */ + rp->host[colon - arg] = 0; + + /* add remote host to global list */ + cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp); + + return true; +} + static error_t parse_opt (int key, char *arg, struct argp_state *state) { switch(key) { @@ -853,6 +1026,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) case 'F': cld_srv.flags |= SFL_FOREGROUND; break; + case 'm': + if ((strlen(arg) > 3) && (strlen(arg) < 64) && + (strchr(arg, '.'))) + cld_srv.force_myhost = arg; + else { + fprintf(stderr, "invalid myhost: '%s'\n", arg); + argp_usage(state); + } + break; case 'p': /* * We do not permit "0" as an argument in order to be safer @@ -871,6 +1053,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) case 'P': cld_srv.pid_file = arg; break; + case 'r': + if (atoi(arg) > 0 && atoi(arg) < 65536) + cld_srv.rep_port = atoi(arg); + else { + fprintf(stderr, "invalid rep-port: '%s'\n", arg); + argp_usage(state); + } + break; + case 'R': + if (!add_remote(arg)) { + fprintf(stderr, "invalid remote host:port: '%s'\n", arg); + argp_usage(state); + } + break; + case 'S': { + int n_peers = atoi(arg); + if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) && + (n_peers & 0x01)) + cld_srv.n_peers = atoi(arg); + else { + fprintf(stderr, "invalid peer count: '%s'\n", arg); + argp_usage(state); + } + break; + } case 1001: /* --strict-free */ strict_free = true; @@ -960,6 +1167,12 @@ static int main_loop(void) } next_timeout = timers_run(); + + if (cld_srv.state_cldb_new != ST_CLDB_INIT && + cld_srv.state_cldb_new != cld_srv.state_cldb) { + cldb_state_process(cld_srv.state_cldb_new); + cld_srv.state_cldb = cld_srv.state_cldb_new; + } } return 0; @@ -968,7 +1181,10 @@ static int main_loop(void) int main (int argc, char *argv[]) { error_t aprc; - int rc = 1; + int rc = 1, env_flags; + + cld_srv.state_cldb = + cld_srv.state_cldb_new = ST_CLDB_INIT; /* isspace() and strcasecmp() consistency requires this */ setlocale(LC_ALL, "C"); @@ -993,6 +1209,20 @@ int main (int argc, char *argv[]) if (use_syslog) openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3); + if (cld_srv.force_myhost) + cld_srv.myhost = strdup(cld_srv.force_myhost); + else + cld_srv.myhost = get_hostname(); + + if (debugging) + applog(LOG_DEBUG, "our hostname: %s", cld_srv.myhost); + + /* remotes file should list all in peer group, except for us */ + if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) { + applog(LOG_ERR, "n_peers does not match remotes file loaded"); + goto err_out; + } + if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) { syslogerr("daemon"); goto err_out; @@ -1013,17 +1243,8 @@ int main (int argc, char *argv[]) signal(SIGTERM, term_signal); signal(SIGUSR1, stats_signal); - if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL, - DB_CREATE | DB_THREAD | DB_RECOVER, - "cld", use_syslog, - DB_CREATE | DB_THREAD, NULL)) - exit(1); - - ensure_root(); - timer_init(&cld_srv.chkpt_timer, "db4-checkpoint", cldb_checkpoint, NULL); - add_chkpt_timer(); rc = 1; @@ -1036,17 +1257,52 @@ int main (int argc, char *argv[]) !cld_srv.polls) goto err_out_pid; - if (sess_load(cld_srv.sessions) != 0) - goto err_out_pid; + if (pipe(cld_srv.rep_pipe) < 0) { + syslogerr("pipe"); + goto err_out; + } /* set up server networking */ rc = net_open(); if (rc) goto err_out_pid; + { + struct pollfd pfd; + struct server_poll sp; + + /* + * add pipe to poll list, after doing so with our net sockets + */ + sp.fd = cld_srv.rep_pipe[0]; + sp.cb = noop_event; + sp.userdata = NULL; + g_array_append_val(cld_srv.poll_data, sp); + + pfd.fd = cld_srv.rep_pipe[0]; + pfd.events = POLLIN; + pfd.revents = 0; + g_array_append_val(cld_srv.polls, pfd); + } + + env_flags = DB_RECOVER | DB_CREATE | DB_THREAD; + if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL, + env_flags, "cld", true, + cld_srv.rep_remotes, + cld_srv.myhost, cld_srv.rep_port, + cld_srv.n_peers, cldb_state_cb)) { + applog(LOG_ERR, "Failed to open CLDB, limping"); + } else { + cld_srv.state_cldb = + cld_srv.state_cldb_new = ST_CLDB_OPEN; + } + applog(LOG_INFO, "initialized: dbg %u%s", debugging, strict_free ? ", strict-free" : ""); + applog(LOG_INFO, "replication: %s:%u", + cld_srv.myhost, + cld_srv.rep_port); /* * execute main loop @@ -1060,7 +1316,8 @@ int main (int argc, char *argv[]) if (cld_srv.cldb.up) cldb_down(&cld_srv.cldb); - cldb_fini(&cld_srv.cldb); + if (cld_srv.state_cldb >= ST_CLDB_OPEN) + cldb_fini(&cld_srv.cldb); err_out_pid: unlink(cld_srv.pid_file); diff --git a/test/pid-exists b/test/pid-exists index 34c7258..294f7dd 100755 --- a/test/pid-exists +++ b/test/pid-exists @@ -1,15 +1,17 @@ #!/bin/sh -if [ ! -f cld.pid ] -then - echo "pid file not found." >&2 - exit 1 -fi - -if [ ! -f cld.port ] -then - echo "port file not found." >&2 - exit 1 -fi +for n in 1 2 3 +do + if [ ! -f cld$n.pid ] + then + echo "cld$n.pid not found." >&2 + exit 1 + fi + if [ ! -f cld$n.port ] + then + echo "cld$n.port file not found." >&2 + exit 1 + fi +done exit 0 diff --git a/test/prep-db b/test/prep-db index 353ca4a..3e4fb60 100755 --- a/test/prep-db +++ b/test/prep-db @@ -2,13 +2,16 @@ DATADIR=data -mkdir -p $DATADIR - -if [ ! -d $DATADIR ] -then - rm -rf $DATADIR - echo "test database dir not found." - exit 1 -fi +for n in 1 2 3 +do + mkdir -p $DATADIR/n$n/data + + if [ ! -d $DATADIR/n$n/data ] + then + rm -rf $DATADIR + echo "test database dir for node $n not found." + exit 1 + fi +done exit 0 diff --git a/test/start-daemon b/test/start-daemon index 15d4546..3ea77f5 100755 --- a/test/start-daemon +++ b/test/start-daemon @@ -1,16 +1,32 @@ #!/bin/sh -if [ -f cld.pid ] +if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ] then - echo "pid file found. daemon still running?" + echo "pid file found. daemons still running?" exit 1 fi ## Static port # ../server/cld -P cld.pid -d "$PWD/data" -p 18181 --port-file=cld.port -E ## Dynamic port -../server/cld -P cld.pid -d "$PWD/data" -p auto --port-file=cld.port -E +../server/cld -P cld1.pid -d "$PWD/data/n1/data" -p auto --port-file=cld1.port \ + -E -D 2 -S 3 \ + -m localhost.localdomain \ + -R localhost.localdomain:19182 \ + -R localhost.localdomain:19183 +../server/cld -P cld2.pid -d "$PWD/data/n2/data" -p auto --port-file=cld2.port \ + -E -D 2 -S 3 \ + -m localhost.localdomain \ + -R localhost.localdomain:19181 \ + -R localhost.localdomain:19183 +../server/cld -P cld3.pid -d "$PWD/data/n3/data" -p auto --port-file=cld3.port \ + -E -D 2 -S 3 \ + -m localhost.localdomain \ + -R localhost.localdomain:19181 \ + -R localhost.localdomain:19182 +sleep 1 -sleep 3 +echo " start-daemon: Waiting 20s, for daemons to start up..." +sleep 20 exit 0 diff --git a/test/stop-daemon b/test/stop-daemon index 1949dc4..1c7276d 100755 --- a/test/stop-daemon +++ b/test/stop-daemon @@ -1,25 +1,25 @@ #!/bin/sh -rm -f cld.port +rm -f cld?.port -if [ ! -f cld.pid ] +if [ ! -f cld1.pid -o ! -f cld2.pid -o ! -f cld3.pid ] then - echo no daemon pid file found. + echo at least one daemon pid file missing. exit 1 fi -kill `cat cld.pid` +kill `cat cld*.pid` for n in 0 1 2 3 4 5 6 7 8 9 do - if [ ! -f cld.pid ] + if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ] then + sleep 1 + else exit 0 fi - - sleep 1 done -echo "PID file not removed, after signal sent." -rm -f cld.pid +echo "PID file(s) not removed, after signal sent." +rm -f cld*.pid exit 1 -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html