[Patch 5/7] tabled: Add replication daemon

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch adds what amounts to a background process that maintains
redundancy for object data. It is far from the complete solution.
For one thing, it does not verify checksums. But it's a start.

There's no way to turn this off, by intention. The whole thing must
work very reliably, not steal too much from benchmarks (but if it
does, it's only honest to take the hit). It is indispensible.
However, there's a plan to add useful monitoring of jobs and other
state, such as available nodes.

Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx>

---
 server/Makefile.am |    4 
 server/replica.c   |  753 +++++++++++++++++++++++++++++++++++++++++++
 server/server.c    |   25 +
 server/tabled.h    |   12 
 4 files changed, 789 insertions(+), 5 deletions(-)

diff --git a/server/Makefile.am b/server/Makefile.am
index f994b36..8faa95a 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,8 +4,8 @@ INCLUDES	= -I$(top_srcdir)/include @GLIB_CFLAGS@ @CHUNKDC_CFLAGS@ @CLDC_CFLAGS@
 sbin_PROGRAMS	= tabled tdbadm
 
 tabled_SOURCES	= tabled.h		\
-		  bucket.c object.c server.c storage.c storparse.c cldu.c \
-		  config.c util.c
+		  bucket.c object.c server.c storage.c storparse.c replica.c \
+		  cldu.c config.c util.c
 tabled_LDADD	= ../lib/libhttputil.a ../lib/libtdb.a		\
 		  @CHUNKDC_LIBS@ @CLDC_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
 		  @CRYPTO_LIBS@ @DB4_LIBS@ @EVENT_LIBS@ @ARGP_LIBS@ @SSL_LIBS@
diff --git a/server/replica.c b/server/replica.c
new file mode 100644
index 0000000..ff814da
--- /dev/null
+++ b/server/replica.c
@@ -0,0 +1,753 @@
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+#include <sys/types.h>
+#include <string.h>
+#include <syslog.h>
+#include <time.h>
+#include <db.h>
+#include <elist.h>
+#include "tabled.h"
+
+/*
+ * Replication Job
+ */
+struct rep_job {
+	struct list_head jlink;
+
+	uint64_t oid;
+	uint64_t size;		/* all of the object */
+	time_t start_time;
+	/* cannot look up by oid, keep key */
+	size_t klen;
+	struct db_obj_key *key;
+
+	struct storage_node *src, *dst;
+	struct open_chunk in_ce, out_ce;
+	long in_len;		/* can MIN() take long long? */
+	char *buf;
+	char *bptr;		/* points into buf */
+	ssize_t bcnt;		/* currently in buf */
+};
+
+struct rep_jobs {
+	int njobs;
+	struct list_head jlist;
+};
+
+static struct rep_jobs active = { 0, LIST_HEAD_INIT(active.jlist) };
+static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) };
+static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) };
+
+static void job_dispatch(void);
+
+/* should've called this job_alloc_and_fill actually */
+static struct rep_job *job_alloc(size_t klen, struct db_obj_key *key)
+{
+	struct rep_job *job;
+	size_t len;
+
+	len = sizeof(struct rep_job) + klen;
+	job = malloc(len);
+	if (job) {
+		memset(job, 0, sizeof(struct rep_job));
+		memcpy(job+1, key, klen);
+		job->klen = klen;
+		job->key = (struct db_obj_key *)(job+1);
+	}
+	return job;
+}
+
+static void job_free(struct rep_job *job)
+{
+	free(job->buf);
+	free(job);
+}
+
+/* N.B. the current calling convention is to wait for drain on the socket */
+static void job_done(struct rep_job *job)
+{
+/* P3 */ applog(LOG_INFO, "job done oid %llX", (long long) job->oid);
+	if (!stor_put_end(&job->out_ce)) {
+		applog(LOG_ERR, "Chunk sync failed on nid %u", job->dst->id);
+	}
+	stor_close(&job->out_ce);
+	stor_close(&job->in_ce);
+
+	list_del(&job->jlink);
+	--active.njobs;
+
+	list_add(&job->jlink, &done.jlist);
+}
+
+static void job_abend(struct rep_job *job)
+{
+/* P3 */ applog(LOG_INFO, "job abend from %u to %u oid %llX",
+  job->src->id, job->dst->id, (long long) job->oid);
+	stor_abort(&job->out_ce);
+	stor_close(&job->out_ce);
+	stor_close(&job->in_ce);
+
+	list_del(&job->jlink);
+	--active.njobs;
+	job_free(job);
+}
+
+static int job_submit_buf(struct rep_job *job, char *buf, ssize_t len)
+{
+	ssize_t bytes;
+
+	job->bptr = buf;
+	job->bcnt = len;
+
+	bytes = stor_put_buf(&job->out_ce, job->bptr, job->bcnt);
+	if (bytes < 0) {
+		job->bcnt = 0;
+		if (debugging)
+			applog(LOG_DEBUG, "stor_put_buf failed (%d)", bytes);
+		return bytes;
+	}
+	job->bptr += bytes;
+	job->bcnt -= bytes;
+	return 0;
+}
+
+static void job_get_poke(struct rep_job *job)
+{
+	ssize_t bytes;
+
+	for (;;) {
+		if (job->bcnt != 0)
+			break;
+		if (!job->in_len)
+			break;
+		bytes = stor_get_buf(&job->in_ce, job->buf,
+				     MIN(job->in_len, CLI_DATA_BUF_SZ));
+		if (bytes < 0) {
+			applog(LOG_ERR, "read failed oid %llX at nid %u",
+			       (unsigned long long) job->oid, job->src->id);
+			goto err_out;
+		}
+		if (bytes == 0)
+			break;
+		if (job_submit_buf(job, job->buf, bytes))
+			goto err_out;
+		job->in_len -= bytes;
+	}
+
+	if (job->bcnt == 0 && job->in_len == 0) {
+		job_done(job);
+		return;
+	}
+
+	/*
+	 * Since storage events automatically arm and disarm themselves,
+	 * we can just return to the main loop without a fear of looping.
+	 */
+	return;
+
+err_out:
+	job_abend(job);
+	return;
+}
+
+static void job_get_event(struct open_chunk *stp)
+{
+	job_get_poke(stp->cli);
+
+	job_dispatch();
+}
+
+static void job_put_poke(struct rep_job *job)
+{
+	ssize_t bytes;
+
+	bytes = stor_put_buf(&job->out_ce, job->bptr, job->bcnt);
+	if (bytes < 0) {
+		job->bcnt = 0;
+		applog(LOG_ERR, "write failed oid %llX at nid %u",
+		       (unsigned long long) job->oid, job->src->id);
+		job_abend(job);
+		return;
+	}
+	job->bptr += bytes;
+	job->bcnt -= bytes;
+
+	if (!job->bcnt)
+		job_get_poke(job);
+}
+
+static void job_put_event(struct open_chunk *stp)
+{
+	struct rep_job *job = stp->cli;
+
+	if (job->bcnt) {
+		job_put_poke(job);
+	} else {
+		job_get_poke(job);
+	}
+
+	job_dispatch();
+}
+
+/* well, not much scheduling for now, just throw to the tail of the queue. */
+static int job_schedule(struct rep_job *job)
+{
+
+	job->start_time = time(NULL);
+
+	/* P3 */ applog(LOG_INFO, "job oid %llX start %lu from %u to %u",
+	    job->oid, (long)job->start_time, job->src->id, job->dst->id);
+
+	list_add(&job->jlink, &queue.jlist);
+	queue.njobs++;
+	return 0;
+}
+
+/* FIXME needs to loop while active.njobs < max or something */
+static void job_dispatch()
+{
+	struct rep_job *job;
+	uint64_t objsize;	/* As reported by Chunk. Not used. */
+	int rc;
+
+	if (active.njobs >= 2)	/* FIXME: Bogus. Need to know current loads. */
+		return;
+
+	if (list_empty(&queue.jlist))
+		return;
+	job = list_entry(queue.jlist.next, struct rep_job, jlink);
+	list_del(&job->jlink);
+	--queue.njobs;
+
+	job->buf = malloc(CLI_DATA_BUF_SZ);
+	if (!job->buf)
+		goto err_malloc;
+
+	rc = stor_open(&job->in_ce, job->src);
+	if (rc) {
+		applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
+		       job->src->id, rc);
+		goto err_inopen;
+	}
+	job->in_ce.cli = job;
+
+	rc = stor_open(&job->out_ce, job->dst);
+	if (rc) {
+		applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
+		       job->dst->id, rc);
+		goto err_outopen;
+	}
+	job->out_ce.cli = job;
+
+	rc = stor_open_read(&job->in_ce, job_get_event, job->oid, &objsize);
+	if (rc) {
+		applog(LOG_ERR, "Cannot start nid %u for oid %llX (%d)",
+		       job->src->id, (unsigned long long) job->oid, rc);
+		goto err_read;
+	}
+	job->in_len = job->size;
+
+	rc = stor_put_start(&job->out_ce, job_put_event, job->oid, job->size);
+	if (rc) {
+		applog(LOG_ERR, "Cannot start putting, nid %u (%d)",
+		       job->dst->id, rc);
+		goto err_put;
+	}
+
+	list_add(&job->jlink, &active.jlist);
+	active.njobs++;
+
+	job_get_poke(job);	/* required to start */
+
+	return;
+
+err_put:
+err_read:
+	stor_close(&job->out_ce);
+err_outopen:
+	stor_close(&job->in_ce);
+err_inopen:
+	/* no free(buf) since job_free does it */
+err_malloc:
+	job_free(job);
+	return;
+}
+
+static struct storage_node *job_select_src(int nnum,
+					   struct storage_node *nvec[])
+{
+	if (nnum == 0)
+		return NULL;
+	return nvec[rand() % nnum];
+}
+
+/* FIXME Need to select by capacity and load. Ditto in initial selection. */
+static struct storage_node *job_select_dst(int nnum,
+					   struct storage_node *nvec[])
+{
+	enum { NRAND = 20 };
+	struct storage_node *tmp[NRAND];
+	int n, i;
+	struct storage_node *stn;
+	time_t t1;
+
+	t1 = time(NULL);
+	n = 0;
+	list_for_each_entry(stn, &tabled_srv.all_stor, all_link) {
+		if (!stn->up)
+			continue;
+		if (t1 > stn->last_up + CHUNK_REBOOT_TIME)
+			continue;
+
+		/* de-dup with source */
+		for (i = 0; i < nnum; i++) {
+			if (nvec[i] == stn)
+				break;
+		}
+		if (i < nnum)
+			continue;
+
+		tmp[n] = stn;
+		n++;
+	}
+	if (n == 0)
+		return NULL;
+	return tmp[rand() % n];
+}
+
+static struct rep_job *job_find_by_oid(uint64_t oid)
+{
+	struct rep_job *pos;
+
+	list_for_each_entry(pos, &queue.jlist, jlink) {
+		if (pos->oid == oid)
+			return pos;
+	}
+	list_for_each_entry(pos, &active.jlist, jlink) {
+		if (pos->oid == oid)
+			return pos;
+	}
+	return NULL;
+}
+
+/* start replicating the key somewhere */
+static void rep_job_start(size_t klen, struct db_obj_key *key,
+			  uint64_t oid, uint64_t objsize,
+			  int nnum, struct storage_node *nvec[])
+{
+	struct rep_job *job;
+
+	if (objsize == 0) {
+		static int cnt = 10;
+		if (cnt > 0) {	/* internal error; if it ever hits, it floods */
+			--cnt;
+			applog(LOG_ERR, "Submitting oid %llX with zero size",
+			       (long long) oid);
+		}
+		return;
+	}
+	if (job_find_by_oid(oid) != NULL)
+		return;
+	job = job_alloc(klen, key);
+	if (!job)
+		goto err_alloc;
+	job->oid = oid;
+	job->size = objsize;
+	job->src = job_select_src(nnum, nvec);
+	if (!job->src) {
+		/* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid);
+		goto err_src;
+	}
+	job->dst = job_select_dst(nnum, nvec);
+	if (!job->dst) {
+		/* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid);
+		goto err_dst;
+	}
+	if (job->src->id == job->dst->id) {
+		/* Is this bad enough to invoke exit(1) right here? */
+		applog(LOG_ERR, "Internal error, copy from/to nid %u",
+		       job->src->id);
+		return;
+	}
+	if (job_schedule(job) != 0)
+		goto err_sched;
+	job_dispatch();
+	return;
+
+err_sched:
+err_dst:
+err_src:
+	job_free(job);
+err_alloc:
+	return;
+}
+
+/*
+ * rep_scan() and friends
+ * Read the whole db of keys, replicate those below redundancy.
+ */
+
+struct cursor {		/* our own "soft" cursor, works across transactions */
+	size_t klen;	/* zero possible, means no key */
+	struct db_obj_key *key;
+	DB_ENV *db_env;
+	DB     *db_objs;
+	DB_TXN *db_txn;
+	DBC    *db_cur;
+};
+
+static int rep_scan_open(struct cursor *cp)
+{
+	int rc;
+
+	rc = cp->db_env->txn_begin(cp->db_env, NULL, &cp->db_txn, 0);
+	if (rc) {
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_begin");
+		goto err_none;
+	}
+
+	// DB_WRITECURSOR ?  DB_BULK ?
+	rc = cp->db_objs->cursor(cp->db_objs, cp->db_txn, &cp->db_cur, 0);
+	if (rc) {
+		cp->db_objs->err(cp->db_objs, rc, "objs->cursor");
+		goto err_out;
+	}
+
+	return 0;
+
+err_out:
+	rc = cp->db_txn->abort(cp->db_txn);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_abort");
+err_none:
+	return -1;
+}
+
+static void rep_scan_close(struct cursor *cp)
+{
+	int rc;
+
+	rc = cp->db_cur->close(cp->db_cur);
+	if (rc) {
+		cp->db_objs->err(cp->db_objs, rc, "objs->cursor close");
+		goto err_out;
+	}
+	cp->db_cur = NULL;
+
+	rc = cp->db_txn->commit(cp->db_txn, 0);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_commit");
+	cp->db_txn = NULL;
+	return;
+
+err_out:
+	rc = cp->db_txn->abort(cp->db_txn);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_abort");
+	return;
+}
+
+/* get next */
+static int rep_scan_get(struct cursor *cp, struct db_obj_ent **pobj)
+{
+	unsigned int get_flags;
+	DBT pkey, pval;
+	int rc;
+
+	if (cp->db_cur) {
+		get_flags = DB_NEXT;
+	} else {
+		if (rep_scan_open(cp) != 0)
+			return -1;
+		get_flags = DB_SET_RANGE;
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = cp->key;
+	pkey.size = cp->klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.flags = DB_DBT_MALLOC;
+
+	rc = cp->db_cur->get(cp->db_cur, &pkey, &pval, get_flags);
+	if (rc) {
+		if (rc != DB_NOTFOUND)
+			cp->db_objs->err(cp->db_objs, rc, "cur->get for keys");
+		return -1;
+	}
+
+	*pobj = pval.data;
+	return 0;
+}
+
+/* parse object into cursor state */
+static int rep_scan_parse(struct cursor *cp, struct db_obj_ent *obj)
+{
+	unsigned int obj_koff, obj_klen;
+	struct db_obj_key *okey;
+
+	obj_klen = GUINT16_FROM_LE(*(uint16_t *)(obj+1));
+	if (obj_klen >= 64*1024) {	/* byteswapped or corrupt */
+		applog(LOG_ERR, "bad key length %d", obj_klen);
+		return -1;
+	}
+	obj_koff = obj->n_str * sizeof(uint16_t);
+
+	okey = malloc(64 + obj_klen);
+
+	memcpy(okey->bucket, obj->bucket, 64);
+	memcpy(okey->key, (char *)(obj+1) + obj_koff, obj_klen);
+
+	free(cp->key);
+	cp->key = okey;
+	cp->klen = 64 + obj_klen;
+	return 0;
+}
+
+/* meat of scan - check if replication is need on the key */
+static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
+{
+	char bucket_name[65];
+	char object_name[1025];
+	uint64_t oid;
+	int i;
+	struct storage_node *redvec[MAXWAY];
+	int allcnt, redcnt;
+	uint32_t nid;
+	struct storage_node *stn;
+	time_t t1;
+
+	memcpy(bucket_name, cp->key->bucket, 64);
+	bucket_name[64] = 0;
+	memcpy(object_name, cp->key->key, cp->klen - 64);
+	object_name[cp->klen - 64] = 0;
+
+	t1 = time(NULL);
+
+	allcnt = 0;
+	redcnt = 0;
+	for (i = 0; i < MAXWAY; i++) {
+		nid = GUINT32_FROM_LE(obj->d.a.nidv[i]);
+		if (!nid)
+			continue;
+		stn = stor_node_by_nid(nid);
+		if (!stn)
+			continue;
+		allcnt++;
+		if (!stn->up)
+			continue;
+		if (t1 > stn->last_up + CHUNK_REBOOT_TIME)
+			continue;
+		/*
+		 * This is where we later ask chunks for checksums (TODO).
+		 */
+
+		redvec[redcnt] = stn;
+		redcnt++;
+	}
+
+	oid = GUINT64_FROM_LE(obj->d.a.oid);
+
+	applog(LOG_INFO, "bucket %s key %s oid %llX n(%u,%u,%u): all %d ok %d",
+	       bucket_name, object_name, (long long) oid,
+	       GUINT32_FROM_LE(obj->d.a.nidv[0]),
+	       GUINT32_FROM_LE(obj->d.a.nidv[1]),
+	       GUINT32_FROM_LE(obj->d.a.nidv[2]),
+	       allcnt, redcnt);
+
+	if (redcnt < MAXWAY) {		/* maybe have MINWAY too? */
+		rep_job_start(cp->klen, cp->key, oid,
+			      GUINT64_FROM_LE(obj->size),
+			      redcnt, redvec);
+	}
+}
+
+static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
+{
+	DB_ENV *db_env = tdb.env;
+	DB *db_objs = tdb.objs;
+	DB_TXN *db_txn;
+	DBT pkey, pval;
+	struct db_obj_ent *obj;
+	ssize_t oelen;
+	unsigned empty;
+	uint32_t n;
+	int i;
+	int rc;
+
+	rc = db_env->txn_begin(db_env, NULL, &db_txn, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "DB_ENV->txn_begin");
+		goto err_none;
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.flags = DB_DBT_MALLOC;
+
+	rc = db_objs->get(db_objs, db_txn, &pkey, &pval, DB_RMW);
+	if (rc) {
+		db_env->err(db_env, rc, "objs->get");
+		goto err_get;
+	}
+
+	obj = pval.data;
+	oelen = pval.size;
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	rc = db_objs->del(db_objs, db_txn, &pkey, 0);
+	if (rc) {
+		db_objs->err(db_objs, rc, "objs->del");
+		goto err_del;
+	}
+
+	empty = 0;
+	for (i = 0; i < MAXWAY; i++) {
+		n = GUINT32_FROM_LE(obj->d.a.nidv[i]);
+		if (n && n == nid) {
+			applog(LOG_WARNING,
+			       "object %llX already has nid %u",
+			       (long long) GUINT64_FROM_LE(obj->d.a.oid), nid);
+			goto err_check;
+		}
+		if (!n)
+			empty++;
+	}
+	if (!empty) {
+		applog(LOG_WARNING,
+		      "object %llX already fully redundant, dropping nid %u",
+		       (long long) GUINT64_FROM_LE(obj->d.a.oid), nid);
+		goto err_check;
+	}
+
+	for (i = 0; i < MAXWAY; i++) {
+		if (!obj->d.a.nidv[i]) {
+			obj->d.a.nidv[i] = GUINT32_TO_LE(nid);
+			break;
+		}
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.data = obj;
+	pval.size = oelen;
+
+	rc = db_objs->put(db_objs, db_txn, &pkey, &pval, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "objs->put");
+		goto err_put;
+	}
+
+	free(obj);
+
+	rc = db_txn->commit(db_txn, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "DB_ENV->txn_commit");
+	}
+	return;
+
+err_put:
+err_check:
+err_del:
+	free(obj);
+err_get:
+	rc = db_txn->abort(db_txn);
+	if (rc)
+		db_env->err(db_env, rc, "DB_ENV->txn_abort");
+err_none:
+	return;
+}
+
+static void rep_retire(void)
+{
+	struct rep_job *job;
+
+	while (!list_empty(&done.jlist)) {
+		job = list_entry(done.jlist.next, struct rep_job, jlink);
+		list_del(&job->jlink);
+
+		rep_add_nid(job->klen, job->key, job->dst->id);
+		job_free(job);
+	}
+}
+
+void rep_scan(void)
+{
+	struct cursor cur;
+	struct db_obj_ent *obj;
+	unsigned long kcnt;
+	time_t start_time, t;
+
+	rep_retire();
+
+	start_time = time(NULL);
+	if (debugging)
+		applog(LOG_DEBUG, "key scan start time %lu", (long)start_time);
+
+	memset(&cur, 0, sizeof(struct cursor));	/* enough to construct */
+	cur.db_env = tdb.env;
+	cur.db_objs = tdb.objs;
+
+	kcnt = 0;
+	for (;;) {
+		if ((t = time(NULL)) >= start_time + 2) {
+			if (debugging)
+				applog(LOG_DEBUG,
+				       "db release at keys %lu seconds %lu",
+				       kcnt, (long)t);
+			rep_scan_close(&cur);
+		}
+
+		if (rep_scan_get(&cur, &obj) != 0)
+			break;
+
+		/* not needed for db4 with DB_NEXT, but eases our logic */
+		if (rep_scan_parse(&cur, obj) != 0) {
+			free(obj);
+			continue;
+		}
+
+		if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
+			rep_scan_verify(&cur, obj);
+
+		free(obj);
+		kcnt++;
+	}
+
+	rep_scan_close(&cur);
+	free(cur.key);
+	cur.key = NULL;
+
+	if (debugging)
+		applog(LOG_DEBUG, "key scan done keys %lu", kcnt);
+	return;
+}
+
diff --git a/server/server.c b/server/server.c
index 2a504e2..673c151 100644
--- a/server/server.c
+++ b/server/server.c
@@ -1318,6 +1318,20 @@ static void tdb_checkpoint(int fd, short events, void *userdata)
 	add_chkpt_timer();
 }
 
+static void add_kscan_timer(void)
+{
+	static const struct timeval tv = { TABLED_RESCAN_SEC, 0 };
+
+	if (evtimer_add(&tabled_srv.kscan_timer, &tv) < 0)
+		applog(LOG_WARNING, "unable to add key scan timer");
+}
+
+static void tdb_keyscan(int fd, short events, void *userdata)
+{
+	rep_scan();
+	add_kscan_timer();
+}
+
 static void tdb_state_cb(enum db_event event)
 {
 
@@ -1365,7 +1379,7 @@ static void tdb_state_cb(enum db_event event)
  *
  * We don't even bother with registering this callback, just call it by name. 
  *
- * The return value is used to re-arm rescan mechanism.
+ * The return value is used to re-arm storage rescan mechanism.
  */
 int stor_update_cb(void)
 {
@@ -1383,8 +1397,15 @@ int stor_update_cb(void)
 				applog(LOG_DEBUG, " NID %u is up", stn->id);
 			num_up++;
 			stn->up = true;
+			stn->last_up = time(NULL);
+		} else {
+			if (stn->last_up != 0 &&
+			    time(NULL) >= stn->last_up + CHUNK_REBOOT_TIME) {
+				applog(LOG_INFO, " NID %u went down", stn->id);
+			}
 		}
 	}
+
 	if (num_up < 1) {
 		applog(LOG_INFO, "No active storage node(s), waiting");
 		return num_up;
@@ -1695,6 +1716,7 @@ static void tdb_state_process(enum st_tdb new_state)
 			return;
 		}
 		add_chkpt_timer();
+		add_kscan_timer();
 		net_listen();
 	}
 }
@@ -1772,6 +1794,7 @@ int main (int argc, char *argv[])
 
 	event_init();
 	evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL);
+	evtimer_set(&tabled_srv.kscan_timer, tdb_keyscan, NULL);
 
 	/* set up server networking */
 	rc = net_open();
diff --git a/server/tabled.h b/server/tabled.h
index fd6142e..31cead2 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -19,9 +19,9 @@
  *
  */
 
-
 #include <stdbool.h>
 #include <stdlib.h>
+#include <time.h>
 #include <netinet/in.h>
 #include <openssl/md5.h>
 #include <glib.h>
@@ -43,6 +43,9 @@ enum {
 	TABLED_PGSZ_LOCK	= 4096,
 
 	TABLED_CHKPT_SEC	= 60 * 5,	/* secs between db4 chkpt */
+	TABLED_RESCAN_SEC	= 60*3 + 7,	/* secs btw key rescans */
+
+	CHUNK_REBOOT_TIME	= 3*60,		/* secs to declare chunk dead */
 
 	CLI_REQ_BUF_SZ		= 8192,		/* buffer for req + hdrs */
 	CLI_DATA_BUF_SZ		= 8192,
@@ -89,6 +92,7 @@ struct storage_node {
 	struct list_head	all_link;
 	uint32_t		id;
 	bool			up;
+	time_t			last_up;
 
 	unsigned		alen;
 	int			addr_af;
@@ -115,7 +119,7 @@ struct open_chunk {
 	struct st_client	*stc;
 	struct storage_node	*node;
 	struct list_head	link;
-	struct client		*cli;
+	void			*cli;	/* usually struct client * */
 
 	uint64_t		wtogo;
 	uint64_t		wkey;
@@ -241,6 +245,7 @@ struct server {
 	enum st_net		state_net;
 
 	struct event		chkpt_timer;	/* db4 checkpoint timer */
+	struct event		kscan_timer;	/* db4 key rescan timer */
 
 	struct server_stats	stats;		/* global statistics */
 };
@@ -340,4 +345,7 @@ extern int stor_node_check(struct storage_node *stn);
 /* storparse.c */
 extern void stor_parse(char *fname, const char *text, size_t len);
 
+/* replica.c */
+extern void rep_scan(void);
+
 #endif /* __TABLED_H__ */
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux