[Patch 1/1] tabled: Add replication daemon

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch adds what amounts to a background process that maintains
redundancy for object data. It is far from the complete solution.
For one thing, it does not verify checksums. But it's a start.

There's no way to turn this off, by intention. The whole thing must
work very reliably, not steal too much from benchmarks (but if it
does, it's only honest to take the hit). It is indispensible.
However, there's a plan to add useful monitoring of jobs and other
state, such as available nodes.

This implementation uses a separate thread.

Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx>

---
 configure.ac       |    2 
 server/Makefile.am |    4 
 server/object.c    |    6 
 server/replica.c   |  855 +++++++++++++++++++++++++++++++++++++++++++
 server/server.c    |   21 -
 server/storage.c   |   72 +++
 server/tabled.h    |   24 +
 7 files changed, 966 insertions(+), 18 deletions(-)

Can go on top of tabled with CLD de-timerization or without.

commit 9a543b33b53f4ce67fc104270eedbd064d44aa1a
Author: Master <zaitcev@xxxxxxxxxxxxxxxxxxxxx>
Date:   Thu Nov 26 19:28:57 2009 -0700

    First stab at replication (threaded).

diff --git a/configure.ac b/configure.ac
index 5b4599c..29673a2 100644
--- a/configure.ac
+++ b/configure.ac
@@ -100,7 +100,7 @@ dnl --------------------------
 dnl autoconf output generation
 dnl --------------------------
 
-AM_PATH_GLIB_2_0(2.0.0)
+AM_PATH_GLIB_2_0(2.0.0, , exit 1, gthread)
 AM_PATH_XML2(2.6.0, ,
   [AC_MSG_ERROR([Missing required XML2 >= 2.6.0])])
 LIBCURL_CHECK_CONFIG(, 7.10.1, ,
diff --git a/server/Makefile.am b/server/Makefile.am
index f994b36..8faa95a 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,8 +4,8 @@ INCLUDES	= -I$(top_srcdir)/include @GLIB_CFLAGS@ @CHUNKDC_CFLAGS@ @CLDC_CFLAGS@
 sbin_PROGRAMS	= tabled tdbadm
 
 tabled_SOURCES	= tabled.h		\
-		  bucket.c object.c server.c storage.c storparse.c cldu.c \
-		  config.c util.c
+		  bucket.c object.c server.c storage.c storparse.c replica.c \
+		  cldu.c config.c util.c
 tabled_LDADD	= ../lib/libhttputil.a ../lib/libtdb.a		\
 		  @CHUNKDC_LIBS@ @CLDC_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
 		  @CRYPTO_LIBS@ @DB4_LIBS@ @EVENT_LIBS@ @ARGP_LIBS@ @SSL_LIBS@
diff --git a/server/object.c b/server/object.c
index 5c0f97f..0793f07 100644
--- a/server/object.c
+++ b/server/object.c
@@ -151,6 +151,7 @@ static void object_unlink(struct db_obj_ent *obj)
 			       "object data(%llX) unlink failed on nid %u",
 			       (unsigned long long) GUINT64_FROM_LE(addr->oid),
 			       nid);
+		stor_node_put(stnode);
 	}
 }
 
@@ -694,7 +695,7 @@ static struct open_chunk *open_chunk1(struct storage_node *stnode,
 		goto err_alloc;
 	}
 
-	rc = stor_open(ochunk, stnode);
+	rc = stor_open(ochunk, stnode, tabled_srv.evbase_main);
 	if (rc != 0) {
 		applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
 		       stnode->id, rc);
@@ -1183,6 +1184,7 @@ bool object_get_body(struct client *cli, const char *user, const char *bucket,
 					applog(LOG_DEBUG,
 					       "Selected nid %u for oid %llX",
 					       nid, cli->in_objid);
+				stor_node_put(stnode);
 				break;
 			}
 		}
@@ -1191,7 +1193,7 @@ bool object_get_body(struct client *cli, const char *user, const char *bucket,
 	if (!stnode)
 		goto stnode_open_retry;
 
-	rc = stor_open(&cli->in_ce, stnode);
+	rc = stor_open(&cli->in_ce, stnode, tabled_srv.evbase_main);
 	if (rc < 0) {
 		applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
 		       stnode->id, rc);
diff --git a/server/replica.c b/server/replica.c
new file mode 100644
index 0000000..17aaaf3
--- /dev/null
+++ b/server/replica.c
@@ -0,0 +1,855 @@
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+#include <sys/types.h>
+#include <string.h>
+#include <syslog.h>
+#include <time.h>
+#include <db.h>
+#include <elist.h>
+#include "tabled.h"
+
+/*
+ * Replication Job
+ */
+struct rep_job {
+	struct list_head jlink;
+
+	uint64_t oid;
+	uint64_t size;		/* all of the object */
+	time_t start_time;
+	/* cannot look up by oid, keep key */
+	size_t klen;
+	struct db_obj_key *key;
+
+	struct storage_node *src, *dst;
+	struct open_chunk in_ce, out_ce;
+	long in_len;		/* can MIN() take long long? */
+	char *buf;
+	char *bptr;		/* points into buf */
+	ssize_t bcnt;		/* currently in buf */
+};
+
+struct rep_jobs {
+	int njobs;
+	struct list_head jlist;
+};
+
+static struct rep_jobs active = { 0, LIST_HEAD_INIT(active.jlist) };
+static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) };
+static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) };
+
+static struct event_base *evbase;
+static struct event kscan_timer;	/* db4 key rescan timer */
+static bool kscan_enabled = false;
+static GThread *scan_thread;
+static time_t kscan_last;
+
+static void job_dispatch(void);
+
+/* should've called this job_alloc_and_fill actually */
+static struct rep_job *job_alloc(size_t klen, struct db_obj_key *key)
+{
+	struct rep_job *job;
+	size_t len;
+
+	len = sizeof(struct rep_job) + klen;
+	job = malloc(len);
+	if (job) {
+		memset(job, 0, sizeof(struct rep_job));
+		memcpy(job+1, key, klen);
+		job->klen = klen;
+		job->key = (struct db_obj_key *)(job+1);
+	}
+	return job;
+}
+
+static void job_free(struct rep_job *job)
+{
+	if (job->src)
+		stor_node_put(job->src);
+	if (job->dst)
+		stor_node_put(job->dst);
+	free(job->buf);
+	free(job);
+}
+
+/* N.B. the current calling convention is to wait for drain on the socket */
+static void job_done(struct rep_job *job)
+{
+/* P3 */ applog(LOG_INFO, "job done oid %llX", (long long) job->oid);
+	if (!stor_put_end(&job->out_ce)) {
+		applog(LOG_ERR, "Chunk sync failed on nid %u", job->dst->id);
+	}
+	stor_close(&job->out_ce);
+	stor_close(&job->in_ce);
+
+	list_del(&job->jlink);
+	--active.njobs;
+
+	list_add(&job->jlink, &done.jlist);
+	done.njobs++;
+}
+
+static void job_abend(struct rep_job *job)
+{
+/* P3 */ applog(LOG_INFO, "job abend from %u to %u oid %llX",
+  job->src->id, job->dst->id, (long long) job->oid);
+	stor_abort(&job->out_ce);
+	stor_close(&job->out_ce);
+	stor_close(&job->in_ce);
+
+	list_del(&job->jlink);
+	--active.njobs;
+	job_free(job);
+}
+
+static int job_submit_buf(struct rep_job *job, char *buf, ssize_t len)
+{
+	ssize_t bytes;
+
+	job->bptr = buf;
+	job->bcnt = len;
+
+	bytes = stor_put_buf(&job->out_ce, job->bptr, job->bcnt);
+	if (bytes < 0) {
+		job->bcnt = 0;
+		if (debugging)
+			applog(LOG_DEBUG, "stor_put_buf failed (%d)", bytes);
+		return bytes;
+	}
+	job->bptr += bytes;
+	job->bcnt -= bytes;
+	return 0;
+}
+
+static void job_get_poke(struct rep_job *job)
+{
+	ssize_t bytes;
+
+	for (;;) {
+		if (job->bcnt != 0)
+			break;
+		if (!job->in_len)
+			break;
+		bytes = stor_get_buf(&job->in_ce, job->buf,
+				     MIN(job->in_len, CLI_DATA_BUF_SZ));
+		if (bytes < 0) {
+			applog(LOG_ERR, "read failed oid %llX at nid %u",
+			       (unsigned long long) job->oid, job->src->id);
+			goto err_out;
+		}
+		if (bytes == 0)
+			break;
+		if (job_submit_buf(job, job->buf, bytes))
+			goto err_out;
+		job->in_len -= bytes;
+	}
+
+	if (job->bcnt == 0 && job->in_len == 0) {
+		job_done(job);
+		return;
+	}
+
+	/*
+	 * Since storage events automatically arm and disarm themselves,
+	 * we can just return to the main loop without a fear of looping.
+	 */
+	return;
+
+err_out:
+	job_abend(job);
+	return;
+}
+
+static void job_get_event(struct open_chunk *stp)
+{
+	job_get_poke(stp->cli);
+
+	job_dispatch();
+}
+
+static void job_put_poke(struct rep_job *job)
+{
+	ssize_t bytes;
+
+	bytes = stor_put_buf(&job->out_ce, job->bptr, job->bcnt);
+	if (bytes < 0) {
+		job->bcnt = 0;
+		applog(LOG_ERR, "write failed oid %llX at nid %u",
+		       (unsigned long long) job->oid, job->src->id);
+		job_abend(job);
+		return;
+	}
+	job->bptr += bytes;
+	job->bcnt -= bytes;
+
+	if (!job->bcnt)
+		job_get_poke(job);
+}
+
+static void job_put_event(struct open_chunk *stp)
+{
+	struct rep_job *job = stp->cli;
+
+	if (job->bcnt) {
+		job_put_poke(job);
+	} else {
+		job_get_poke(job);
+	}
+
+	job_dispatch();
+}
+
+/* well, not much scheduling for now, just throw to the tail of the queue. */
+static int job_schedule(struct rep_job *job)
+{
+
+	job->start_time = time(NULL);
+
+	/* P3 */ applog(LOG_INFO, "job oid %llX start %lu from %u to %u",
+	    job->oid, (long)job->start_time, job->src->id, job->dst->id);
+
+	list_add(&job->jlink, &queue.jlist);
+	queue.njobs++;
+	return 0;
+}
+
+/* FIXME needs to loop while active.njobs < max or something */
+static void job_dispatch()
+{
+	struct rep_job *job;
+	uint64_t objsize;	/* As reported by Chunk. Not used. */
+	int rc;
+
+	if (active.njobs >= 2)	/* FIXME: Bogus. Need to know current loads. */
+		return;
+
+	if (list_empty(&queue.jlist))
+		return;
+	job = list_entry(queue.jlist.next, struct rep_job, jlink);
+	list_del(&job->jlink);
+	--queue.njobs;
+
+	job->buf = malloc(CLI_DATA_BUF_SZ);
+	if (!job->buf)
+		goto err_malloc;
+
+	rc = stor_open(&job->in_ce, job->src, evbase);
+	if (rc) {
+		applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
+		       job->src->id, rc);
+		goto err_inopen;
+	}
+	job->in_ce.cli = job;
+
+	rc = stor_open(&job->out_ce, job->dst, evbase);
+	if (rc) {
+		applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
+		       job->dst->id, rc);
+		goto err_outopen;
+	}
+	job->out_ce.cli = job;
+
+	rc = stor_open_read(&job->in_ce, job_get_event, job->oid, &objsize);
+	if (rc) {
+		applog(LOG_ERR, "Cannot start nid %u for oid %llX (%d)",
+		       job->src->id, (unsigned long long) job->oid, rc);
+		goto err_read;
+	}
+	job->in_len = job->size;
+
+	rc = stor_put_start(&job->out_ce, job_put_event, job->oid, job->size);
+	if (rc) {
+		applog(LOG_ERR, "Cannot start putting, nid %u (%d)",
+		       job->dst->id, rc);
+		goto err_put;
+	}
+
+	list_add(&job->jlink, &active.jlist);
+	active.njobs++;
+
+	job_get_poke(job);	/* required to start */
+
+	return;
+
+err_put:
+err_read:
+	stor_close(&job->out_ce);
+err_outopen:
+	stor_close(&job->in_ce);
+err_inopen:
+	/* no free(buf) since job_free does it */
+err_malloc:
+	job_free(job);
+	return;
+}
+
+static struct storage_node *job_select_src(int nnum,
+					   struct storage_node *nvec[])
+{
+	if (nnum == 0)
+		return NULL;
+	return stor_node_get(nvec[rand() % nnum]);
+}
+
+/* FIXME Need to select by capacity and load. Ditto in initial selection. */
+static struct storage_node *job_select_dst(int nnum,
+					   struct storage_node *nvec[])
+{
+	enum { NRAND = 20 };
+	struct storage_node *tmp[NRAND];
+	int n, i;
+	struct storage_node *stn;
+	time_t t1;
+
+	g_mutex_lock(tabled_srv.bigmutex);
+	t1 = time(NULL);
+	n = 0;
+	list_for_each_entry(stn, &tabled_srv.all_stor, all_link) {
+		if (!stn->up)
+			continue;
+		if (t1 > stn->last_up + CHUNK_REBOOT_TIME)
+			continue;
+
+		/* de-dup with source */
+		for (i = 0; i < nnum; i++) {
+			if (nvec[i] == stn)
+				break;
+		}
+		if (i < nnum)
+			continue;
+
+		tmp[n] = stn;
+		n++;
+	}
+	if (n == 0) {
+		g_mutex_unlock(tabled_srv.bigmutex);
+		return NULL;
+	}
+	stn = stor_node_get(tmp[rand() % n]);
+	g_mutex_unlock(tabled_srv.bigmutex);
+	return stn;
+}
+
+static struct rep_job *job_find_by_oid(uint64_t oid)
+{
+	struct rep_job *pos;
+
+	list_for_each_entry(pos, &queue.jlist, jlink) {
+		if (pos->oid == oid)
+			return pos;
+	}
+	list_for_each_entry(pos, &active.jlist, jlink) {
+		if (pos->oid == oid)
+			return pos;
+	}
+	return NULL;
+}
+
+/* start replicating the key somewhere */
+static void rep_job_start(size_t klen, struct db_obj_key *key,
+			  uint64_t oid, uint64_t objsize,
+			  int nnum, struct storage_node *nvec[])
+{
+	struct rep_job *job;
+
+	if (objsize == 0) {
+		static int cnt = 10;
+		if (cnt > 0) {	/* internal error; if it ever hits, it floods */
+			--cnt;
+			applog(LOG_ERR, "Submitting oid %llX with zero size",
+			       (long long) oid);
+		}
+		return;
+	}
+	if (job_find_by_oid(oid) != NULL)
+		return;
+	job = job_alloc(klen, key);
+	if (!job)
+		goto err_alloc;
+	job->oid = oid;
+	job->size = objsize;
+	job->src = job_select_src(nnum, nvec);
+	if (!job->src) {
+		/* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid);
+		goto err_src;
+	}
+	job->dst = job_select_dst(nnum, nvec);
+	if (!job->dst) {
+		/* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid);
+		goto err_dst;
+	}
+	if (job->src->id == job->dst->id) {
+		/* Is this bad enough to invoke exit(1) right here? */
+		applog(LOG_ERR, "Internal error, copy from/to nid %u",
+		       job->src->id);
+		return;
+	}
+	if (job_schedule(job) != 0)
+		goto err_sched;
+	job_dispatch();
+	return;
+
+err_sched:
+err_dst:
+err_src:
+	job_free(job);
+err_alloc:
+	return;
+}
+
+/*
+ * rep_scan() and friends
+ * Read the whole db of keys, replicate those below redundancy.
+ */
+
+struct cursor {		/* our own "soft" cursor, works across transactions */
+	size_t klen;	/* zero possible, means no key */
+	struct db_obj_key *key;
+	DB_ENV *db_env;
+	DB     *db_objs;
+	DB_TXN *db_txn;
+	DBC    *db_cur;
+};
+
+static int rep_scan_open(struct cursor *cp)
+{
+	int rc;
+
+	rc = cp->db_env->txn_begin(cp->db_env, NULL, &cp->db_txn, 0);
+	if (rc) {
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_begin");
+		goto err_none;
+	}
+
+	// DB_WRITECURSOR ?  DB_BULK ?
+	rc = cp->db_objs->cursor(cp->db_objs, cp->db_txn, &cp->db_cur, 0);
+	if (rc) {
+		cp->db_objs->err(cp->db_objs, rc, "objs->cursor");
+		goto err_out;
+	}
+
+	return 0;
+
+err_out:
+	rc = cp->db_txn->abort(cp->db_txn);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_abort");
+err_none:
+	return -1;
+}
+
+static void rep_scan_close(struct cursor *cp)
+{
+	int rc;
+
+	rc = cp->db_cur->close(cp->db_cur);
+	if (rc) {
+		cp->db_objs->err(cp->db_objs, rc, "objs->cursor close");
+		goto err_out;
+	}
+	cp->db_cur = NULL;
+
+	rc = cp->db_txn->commit(cp->db_txn, 0);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_commit");
+	cp->db_txn = NULL;
+	return;
+
+err_out:
+	rc = cp->db_txn->abort(cp->db_txn);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_abort");
+	return;
+}
+
+/* get next */
+static int rep_scan_get(struct cursor *cp, struct db_obj_ent **pobj)
+{
+	unsigned int get_flags;
+	DBT pkey, pval;
+	int rc;
+
+	if (cp->db_cur) {
+		get_flags = DB_NEXT;
+	} else {
+		if (rep_scan_open(cp) != 0)
+			return -1;
+		get_flags = DB_SET_RANGE;
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = cp->key;
+	pkey.size = cp->klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.flags = DB_DBT_MALLOC;
+
+	rc = cp->db_cur->get(cp->db_cur, &pkey, &pval, get_flags);
+	if (rc) {
+		if (rc != DB_NOTFOUND)
+			cp->db_objs->err(cp->db_objs, rc, "cur->get for keys");
+		return -1;
+	}
+
+	*pobj = pval.data;
+	return 0;
+}
+
+/* parse object into cursor state */
+static int rep_scan_parse(struct cursor *cp, struct db_obj_ent *obj)
+{
+	unsigned int obj_koff, obj_klen;
+	struct db_obj_key *okey;
+
+	obj_klen = GUINT16_FROM_LE(*(uint16_t *)(obj+1));
+	if (obj_klen >= 64*1024) {	/* byteswapped or corrupt */
+		applog(LOG_ERR, "bad key length %d", obj_klen);
+		return -1;
+	}
+	obj_koff = obj->n_str * sizeof(uint16_t);
+
+	okey = malloc(64 + obj_klen);
+
+	memcpy(okey->bucket, obj->bucket, 64);
+	memcpy(okey->key, (char *)(obj+1) + obj_koff, obj_klen);
+
+	free(cp->key);
+	cp->key = okey;
+	cp->klen = 64 + obj_klen;
+	return 0;
+}
+
+/* meat of scan - check if replication is need on the key */
+static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
+{
+	char bucket_name[65];
+	char object_name[1025];
+	uint64_t oid;
+	int i;
+	struct storage_node *redvec[MAXWAY];
+	int allcnt, redcnt;
+	uint32_t nid;
+	struct storage_node *stn;
+	time_t t1;
+
+	memcpy(bucket_name, cp->key->bucket, 64);
+	bucket_name[64] = 0;
+	memcpy(object_name, cp->key->key, cp->klen - 64);
+	object_name[cp->klen - 64] = 0;
+
+	t1 = time(NULL);
+
+	allcnt = 0;
+	redcnt = 0;
+	for (i = 0; i < MAXWAY; i++) {
+		nid = GUINT32_FROM_LE(obj->d.a.nidv[i]);
+		if (!nid)
+			continue;
+		stn = stor_node_by_nid(nid);
+		if (!stn)
+			continue;
+		allcnt++;
+		if (!stn->up) {
+			stor_node_put(stn);
+			continue;
+		}
+		if (t1 > stn->last_up + CHUNK_REBOOT_TIME) {
+			stor_node_put(stn);
+			continue;
+		}
+		/*
+		 * This is where we later ask chunks for checksums (TODO).
+		 */
+
+		redvec[redcnt] = stn;
+		redcnt++;
+	}
+
+	oid = GUINT64_FROM_LE(obj->d.a.oid);
+
+	applog(LOG_INFO, "bucket %s key %s oid %llX n(%u,%u,%u): all %d ok %d",
+	       bucket_name, object_name, (long long) oid,
+	       GUINT32_FROM_LE(obj->d.a.nidv[0]),
+	       GUINT32_FROM_LE(obj->d.a.nidv[1]),
+	       GUINT32_FROM_LE(obj->d.a.nidv[2]),
+	       allcnt, redcnt);
+
+	if (redcnt < MAXWAY) {		/* maybe have MINWAY too? */
+		rep_job_start(cp->klen, cp->key, oid,
+			      GUINT64_FROM_LE(obj->size),
+			      redcnt, redvec);
+	}
+
+	for (i = 0; i < redcnt; i++)
+		stor_node_put(redvec[i]);
+}
+
+static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
+{
+	DB_ENV *db_env = tdb.env;
+	DB *db_objs = tdb.objs;
+	DB_TXN *db_txn;
+	DBT pkey, pval;
+	struct db_obj_ent *obj;
+	ssize_t oelen;
+	unsigned empty;
+	uint32_t n;
+	int i;
+	int rc;
+
+	rc = db_env->txn_begin(db_env, NULL, &db_txn, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "DB_ENV->txn_begin");
+		goto err_none;
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.flags = DB_DBT_MALLOC;
+
+	rc = db_objs->get(db_objs, db_txn, &pkey, &pval, DB_RMW);
+	if (rc) {
+		db_env->err(db_env, rc, "objs->get");
+		goto err_get;
+	}
+
+	obj = pval.data;
+	oelen = pval.size;
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	rc = db_objs->del(db_objs, db_txn, &pkey, 0);
+	if (rc) {
+		db_objs->err(db_objs, rc, "objs->del");
+		goto err_del;
+	}
+
+	empty = 0;
+	for (i = 0; i < MAXWAY; i++) {
+		n = GUINT32_FROM_LE(obj->d.a.nidv[i]);
+		if (n && n == nid) {
+			applog(LOG_WARNING,
+			       "object %llX already has nid %u",
+			       (long long) GUINT64_FROM_LE(obj->d.a.oid), nid);
+			goto err_check;
+		}
+		if (!n)
+			empty++;
+	}
+	if (!empty) {
+		applog(LOG_WARNING,
+		      "object %llX already fully redundant, dropping nid %u",
+		       (long long) GUINT64_FROM_LE(obj->d.a.oid), nid);
+		goto err_check;
+	}
+
+	for (i = 0; i < MAXWAY; i++) {
+		if (!obj->d.a.nidv[i]) {
+			obj->d.a.nidv[i] = GUINT32_TO_LE(nid);
+			break;
+		}
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.data = obj;
+	pval.size = oelen;
+
+	rc = db_objs->put(db_objs, db_txn, &pkey, &pval, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "objs->put");
+		goto err_put;
+	}
+
+	free(obj);
+
+	rc = db_txn->commit(db_txn, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "DB_ENV->txn_commit");
+	}
+	return;
+
+err_put:
+err_check:
+err_del:
+	free(obj);
+err_get:
+	rc = db_txn->abort(db_txn);
+	if (rc)
+		db_env->err(db_env, rc, "DB_ENV->txn_abort");
+err_none:
+	return;
+}
+
+static void rep_retire(void)
+{
+	struct rep_job *job;
+
+	while (!list_empty(&done.jlist)) {
+		job = list_entry(done.jlist.next, struct rep_job, jlink);
+		list_del(&job->jlink);
+		--done.njobs;
+
+		rep_add_nid(job->klen, job->key, job->dst->id);
+		job_free(job);
+	}
+}
+
+static void rep_scan(void)
+{
+	struct cursor cur;
+	struct db_obj_ent *obj;
+	unsigned long kcnt;
+	time_t start_time, t;
+
+	rep_retire();
+	job_dispatch();
+
+	start_time = time(NULL);
+	if (debugging)
+		applog(LOG_DEBUG, "key scan start time %lu", (long)start_time);
+	kscan_last = start_time;
+
+	memset(&cur, 0, sizeof(struct cursor));	/* enough to construct */
+	cur.db_env = tdb.env;
+	cur.db_objs = tdb.objs;
+
+	kcnt = 0;
+	for (;;) {
+		/* FIXME: need to limit queing by some sane number like number of stn */
+		if (queue.njobs >= 100) {
+			/* P3 */ applog(LOG_INFO, "overload %u", queue.njobs);
+			return;
+		}
+		if ((t = time(NULL)) >= start_time + 2) {
+			if (debugging)
+				applog(LOG_DEBUG,
+				       "db release at keys %lu seconds %lu",
+				       kcnt, (long)t);
+			rep_scan_close(&cur);
+		}
+
+		if (rep_scan_get(&cur, &obj) != 0)
+			break;
+
+		/* not needed for db4 with DB_NEXT, but eases our logic */
+		if (rep_scan_parse(&cur, obj) != 0) {
+			free(obj);
+			continue;
+		}
+
+		if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
+			rep_scan_verify(&cur, obj);
+
+		free(obj);
+		kcnt++;
+	}
+
+	rep_scan_close(&cur);
+	free(cur.key);
+	cur.key = NULL;
+
+	if (debugging)
+		applog(LOG_DEBUG, "key scan done keys %lu", kcnt);
+	return;
+}
+
+static void add_kscan_timer(void)
+{
+	static const struct timeval tv = { TABLED_RESCAN_SEC, 0 };
+
+	if (evtimer_add(&kscan_timer, &tv) < 0)
+		applog(LOG_WARNING, "unable to add key scan timer");
+}
+
+static void tdb_keyscan(int fd, short events, void *userdata)
+{
+	if (kscan_enabled)
+		rep_scan();
+	add_kscan_timer();
+}
+
+static gpointer rep_thread_func(gpointer data)
+{
+	int rc;
+
+	rc = event_reinit(evbase);
+	if (rc) {
+		applog(LOG_ERR, "rep event_reinit failed (%d)", rc);
+		return NULL;
+	}
+
+	evtimer_set(&kscan_timer, tdb_keyscan, NULL);
+	event_base_set(evbase, &kscan_timer);
+
+	/*
+	 * We must add an event now, or else event_base_dispatch will
+	 * exit right away with code 1.
+	 */
+	add_kscan_timer();
+
+	for (;;) {
+		rc = event_base_dispatch(evbase);
+		applog(LOG_ERR, "rep event_base_dispatch exits (%d)", rc);
+		sleep(300);	/* Should not happen, so maybe exit(1)? */
+	}
+	return NULL;
+}
+
+void rep_init(struct event_base *ev_base)
+{
+	GError *error;
+
+	/* Statics are a silly way to pass arguments to threads, but oh well. */
+	evbase = ev_base;
+
+	scan_thread = g_thread_create(rep_thread_func, NULL, FALSE, &error);
+	if (scan_thread == NULL) {
+		applog(LOG_ERR, "Failed to start replication thread: %s",
+		       error->message);
+		exit(1);
+	}
+}
+
+void rep_start()
+{
+	kscan_enabled = true;
+}
+
+void rep_stats()
+{
+	time_t now;
+
+	now = time(NULL);
+	applog(LOG_INFO, "REP queued %d active %d done %d last %lu (+ %ld)",
+	       queue.njobs, active.njobs, done.njobs,
+	       (long) kscan_last, (long) (now - kscan_last));
+}
diff --git a/server/server.c b/server/server.c
index a5935ad..c9e5359 100644
--- a/server/server.c
+++ b/server/server.c
@@ -374,6 +374,8 @@ static void stats_dump(void)
 	X(opt_write);
 	applog(LOG_INFO, "State: TDB %s",
 	    state_name_tdb[tabled_srv.state_tdb]);
+	stor_stats();
+	rep_stats();
 }
 
 #undef X
@@ -1362,7 +1364,7 @@ static void tdb_state_cb(enum db_event event)
  *
  * We don't even bother with registering this callback, just call it by name. 
  *
- * The return value is used to re-arm rescan mechanism.
+ * The return value is used to re-arm storage rescan mechanism.
  */
 int stor_update_cb(void)
 {
@@ -1380,8 +1382,15 @@ int stor_update_cb(void)
 				applog(LOG_DEBUG, " NID %u is up", stn->id);
 			num_up++;
 			stn->up = true;
+			stn->last_up = time(NULL);
+		} else {
+			if (stn->last_up != 0 &&
+			    time(NULL) >= stn->last_up + CHUNK_REBOOT_TIME) {
+				applog(LOG_INFO, " NID %u went down", stn->id);
+			}
 		}
 	}
+
 	if (num_up < 1) {
 		applog(LOG_INFO, "No active storage node(s), waiting");
 		return num_up;
@@ -1692,6 +1701,7 @@ static void tdb_state_process(enum st_tdb new_state)
 			return;
 		}
 		add_chkpt_timer();
+		rep_start();
 		net_listen();
 	}
 }
@@ -1700,6 +1710,7 @@ int main (int argc, char *argv[])
 {
 	error_t aprc;
 	int rc = 1;
+	struct event_base *event_base_rep;
 
 	INIT_LIST_HEAD(&tabled_srv.all_stor);
 	tabled_srv.state_tdb = ST_TDB_INIT;
@@ -1709,6 +1720,9 @@ int main (int argc, char *argv[])
 
 	compile_patterns();
 
+	g_thread_init(NULL);
+	tabled_srv.bigmutex = g_mutex_new();
+
 	SSL_library_init();
 	SSL_load_error_strings();
 
@@ -1767,7 +1781,8 @@ int main (int argc, char *argv[])
 	signal(SIGTERM, term_signal);
 	signal(SIGUSR1, stats_signal);
 
-	event_init();
+	event_base_rep = event_base_new();
+	tabled_srv.evbase_main = event_init();
 	evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL);
 
 	/* set up server networking */
@@ -1780,6 +1795,8 @@ int main (int argc, char *argv[])
 		goto err_cld_session;
 	}
 
+	rep_init(event_base_rep);
+
 	applog(LOG_INFO, "initialized (%s)",
 	   (tabled_srv.flags & SFL_FOREGROUND)? "fg": "bg");
 
diff --git a/server/storage.c b/server/storage.c
index fdb3fc1..b8f74e4 100644
--- a/server/storage.c
+++ b/server/storage.c
@@ -17,6 +17,26 @@
 static const char stor_key_fmt[] = "%016llx";
 #define STOR_KEY_SLEN  16
 
+struct storage_node *stor_node_get(struct storage_node *sn)
+{
+	sn->ref++;
+	if (sn->ref == 103) {		/* FIXME debugging test */
+		applog(LOG_ERR, "ref leak in storage node nid %u", sn->id);
+	}
+	return sn;
+}
+
+void stor_node_put(struct storage_node *sn)
+{
+
+	/* Would be an error in the current code, we never free them. */
+	if (sn->ref == 1) {
+		applog(LOG_ERR, "freeing storage node nid %u", sn->id);
+		return;
+	}
+	--sn->ref;
+}
+
 static int stor_new_stc(struct storage_node *stn, struct st_client **stcp)
 {
 	struct st_client *stc;
@@ -70,7 +90,8 @@ static void stor_write_event(int fd, short events, void *userdata)
 /*
  * Open *cep using stn, set up chunk session if needed.
  */
-int stor_open(struct open_chunk *cep, struct storage_node *stn)
+int stor_open(struct open_chunk *cep, struct storage_node *stn,
+	      struct event_base *ev_base)
 {
 	int rc;
 
@@ -80,8 +101,8 @@ int stor_open(struct open_chunk *cep, struct storage_node *stn)
 	if ((rc = stor_new_stc(stn, &cep->stc)) < 0)
 		return rc;
 
-	cep->node = stn;
-	stn->nchu++;
+	cep->evbase = ev_base;
+	cep->node = stor_node_get(stn);
 
 	/* cep->stc->verbose = 1; */
 
@@ -111,6 +132,7 @@ int stor_put_start(struct open_chunk *cep, void (*cb)(struct open_chunk *),
 	cep->wkey = key;
 	cep->wcb = cb;
 	event_set(&cep->wevt, cep->wfd, EV_WRITE, stor_write_event, cep);
+	event_base_set(cep->evbase, &cep->wevt);
 
 	if (debugging)
 		applog(LOG_INFO, "stor nid %u put %s new for %lld",
@@ -149,6 +171,7 @@ int stor_open_read(struct open_chunk *cep, void (*cb)(struct open_chunk *),
 	cep->roff = 0;
 	cep->rcb = cb;
 	event_set(&cep->revt, cep->rfd, EV_READ, stor_read_event, cep);
+	event_base_set(cep->evbase, &cep->revt);
 
 	if (debugging)
 		applog(LOG_INFO, "stor nid %u get %s size %lld",
@@ -163,7 +186,7 @@ int stor_open_read(struct open_chunk *cep, void (*cb)(struct open_chunk *),
 void stor_close(struct open_chunk *cep)
 {
 	if (cep->stc) {
-		--cep->node->nchu;
+		stor_node_put(cep->node);
 		cep->node = NULL;
 		stc_free(cep->stc);
 		cep->stc = NULL;
@@ -205,7 +228,7 @@ void stor_abort(struct open_chunk *cep)
 
 	rc = stor_new_stc(cep->node, &cep->stc);
 	if (rc < 0) {
-		--cep->node->nchu;
+		stor_node_put(cep->node);
 		cep->node = NULL;
 
 		if (debugging)
@@ -349,7 +372,7 @@ bool stor_obj_test(struct open_chunk *cep, uint64_t key)
 	return true;
 }
 
-struct storage_node *stor_node_by_nid(uint32_t nid)
+static struct storage_node *_stor_node_by_nid(uint32_t nid)
 {
 	struct storage_node *sn;
 
@@ -360,6 +383,17 @@ struct storage_node *stor_node_by_nid(uint32_t nid)
 	return NULL;
 }
 
+struct storage_node *stor_node_by_nid(uint32_t nid)
+{
+	struct storage_node *sn;
+
+	g_mutex_lock(tabled_srv.bigmutex);
+	sn = _stor_node_by_nid(nid);
+	stor_node_get(sn);
+	g_mutex_unlock(tabled_srv.bigmutex);
+	return sn;
+}
+
 static int stor_add_node_addr(struct storage_node *sn,
 			      const char *hostname, const char *portstr)
 {
@@ -422,13 +456,15 @@ void stor_add_node(uint32_t nid, const char *hostname, const char *portstr,
 {
 	struct storage_node *sn;
 
-	sn = stor_node_by_nid(nid);
+	g_mutex_lock(tabled_srv.bigmutex);
+	sn = _stor_node_by_nid(nid);
 	if (sn) {
 		stor_add_node_addr(sn, hostname, portstr);
 	} else {
 		if ((sn = malloc(sizeof(struct storage_node))) == NULL) {
 			applog(LOG_WARNING, "No core (%ld)",
 			       (long) sizeof(struct storage_node));
+			g_mutex_unlock(tabled_srv.bigmutex);
 			return;
 		}
 		memset(sn, 0, sizeof(struct storage_node));
@@ -437,17 +473,23 @@ void stor_add_node(uint32_t nid, const char *hostname, const char *portstr,
 		if ((sn->hostname = strdup(hostname)) == NULL) {
 			applog(LOG_WARNING, "No core");
 			free(sn);
+			g_mutex_unlock(tabled_srv.bigmutex);
 			return;
 		}
 
 		if (stor_add_node_addr(sn, hostname, portstr)) {
+			free(sn->hostname);
 			free(sn);
+			g_mutex_unlock(tabled_srv.bigmutex);
 			return;
 		}
 
+		stor_node_get(sn);
+
 		list_add(&sn->all_link, &tabled_srv.all_stor);
 		tabled_srv.num_stor++;
 	}
+	g_mutex_unlock(tabled_srv.bigmutex);
 }
 
 /* Return 0 if the node checks out ok */
@@ -481,3 +523,19 @@ int stor_node_check(struct storage_node *stn)
 	return 0;
 }
 
+void stor_stats()
+{
+	struct storage_node *sn;
+	time_t now;
+
+	g_mutex_lock(tabled_srv.bigmutex);
+	now = time(NULL);
+	list_for_each_entry(sn, &tabled_srv.all_stor, all_link) {
+		applog(LOG_INFO, "SN nid %u %s last %lu (+ %ld) ref %d name %s",
+		       sn->id, sn->up? "up": "down",
+		       (long) sn->last_up, (long) (now - sn->last_up),
+		       sn->ref, sn->hostname);
+	}
+	g_mutex_unlock(tabled_srv.bigmutex);
+}
+
diff --git a/server/tabled.h b/server/tabled.h
index fd6142e..e0c58b2 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -19,9 +19,9 @@
  *
  */
 
-
 #include <stdbool.h>
 #include <stdlib.h>
+#include <time.h>
 #include <netinet/in.h>
 #include <openssl/md5.h>
 #include <glib.h>
@@ -43,6 +43,9 @@ enum {
 	TABLED_PGSZ_LOCK	= 4096,
 
 	TABLED_CHKPT_SEC	= 60 * 5,	/* secs between db4 chkpt */
+	TABLED_RESCAN_SEC	= 60*3 + 7,	/* secs btw key rescans */
+
+	CHUNK_REBOOT_TIME	= 3*60,		/* secs to declare chunk dead */
 
 	CLI_REQ_BUF_SZ		= 8192,		/* buffer for req + hdrs */
 	CLI_DATA_BUF_SZ		= 8192,
@@ -89,13 +92,14 @@ struct storage_node {
 	struct list_head	all_link;
 	uint32_t		id;
 	bool			up;
+	time_t			last_up;
 
 	unsigned		alen;
 	int			addr_af;
 	struct sockaddr_in6	addr;
 	char *hostname;		/* Only used because stc_new is overly smart. */
 
-	int nchu;		/* number of open_chunk */
+	int ref;		/* number of open_chunk or other */
 };
 
 typedef bool (*cli_evt_func)(struct client *, unsigned int);
@@ -115,7 +119,8 @@ struct open_chunk {
 	struct st_client	*stc;
 	struct storage_node	*node;
 	struct list_head	link;
-	struct client		*cli;
+	void			*cli;	/* usually struct client * */
+	struct event_base	*evbase;
 
 	uint64_t		wtogo;
 	uint64_t		wkey;
@@ -217,6 +222,8 @@ struct listen_cfg {
 struct server {
 	unsigned long		flags;		/* SFL_xxx above */
 	int			pid_fd;		/* fd of pid_file */
+	GMutex			*bigmutex;
+	struct event_base	*evbase_main;
 
 	char			*config;	/* config file (static) */
 
@@ -318,7 +325,10 @@ extern int stor_update_cb(void);
 extern void read_config(void);
 
 /* storage.c */
-extern int stor_open(struct open_chunk *cep, struct storage_node *stn);
+extern struct storage_node *stor_node_get(struct storage_node *stn);
+extern void stor_node_put(struct storage_node *stn);
+extern int stor_open(struct open_chunk *cep, struct storage_node *stn,
+		     struct event_base *ev_base);
 extern int stor_open_read(struct open_chunk *cep,
 			  void (*cb)(struct open_chunk *),
 			  uint64_t key, uint64_t *psz);
@@ -336,8 +346,14 @@ extern struct storage_node *stor_node_by_nid(uint32_t nid);
 extern void stor_add_node(uint32_t nid, const char *hostname,
 			  const char *portstr, struct geo *locp);
 extern int stor_node_check(struct storage_node *stn);
+extern void stor_stats(void);
 
 /* storparse.c */
 extern void stor_parse(char *fname, const char *text, size_t len);
 
+/* replica.c */
+extern void rep_init(struct event_base *ev_base);
+extern void rep_start(void);
+extern void rep_stats(void);
+
 #endif /* __TABLED_H__ */
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux