[Patch 1/1] tabled: switch to ncld

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



No new function just yet, only a switch-over.

Signed-Off-By: Pete Zaitcev <zaitcev@xxxxxxxxxx>

---
 server/cldu.c |  789 +++++++++++++-----------------------------------
 1 file changed, 215 insertions(+), 574 deletions(-)

diff --git a/server/cldu.c b/server/cldu.c
index 7e176d4..aecf336 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -29,8 +29,8 @@
 #include <unistd.h>
 #include <event.h>
 #include <errno.h>
-#include <cldc.h>
 #include <elist.h>
+#include <ncld.h>
 #include "tabled.h"
 
 #define ALIGN8(n)	((8 - ((n) & 7)) & 7)
@@ -49,60 +49,38 @@ struct cld_host {
 
 struct cld_session {
 	bool forced_hosts;		/* Administrator overrode default CLD */
-	bool sess_open;
-	struct cldc_udp *lib;		/* library state */
-	struct event lib_timer;
-	int retry_cnt;
-	int last_recv_err;
+	bool is_dead;
+	struct ncld_sess *nsp;		/* library state */
 
 	/*
 	 * For code sanity and being isomorphic with conventional programming
 	 * using sleep(), neither of the timers must ever be active simultane-
 	 * ously with any other. But using one timer structure is too annoying.
 	 */
-	struct event tm_retry;
+	// struct event tm_relock;
 	struct event tm_rescan;
-	struct event tm_reopen;
 
 	int actx;		/* Active host cldv[actx] */
 	struct cld_host cldv[N_CLD];
 
 	char *thisgroup;
 	char *thishost;
-	struct event ev;	/* Associated with fd */
 	char *cfname;		/* /tabled-group directory */
-	struct cldc_fh *cfh;	/* /tabled-group directory, keep open for scan */
+	struct ncld_fh *cfh;	/* /tabled-group directory, keep open for scan */
 	char *ffname;		/* /tabled-group/thishost */
-	struct cldc_fh *ffh;	/* /tabled-group/thishost, keep open for lock */
+	struct ncld_fh *ffh;	/* /tabled-group/thishost, keep open for lock */
 	char *xfname;		/* /chunk-GROUP directory */
-	struct cldc_fh *xfh;	/* /chunk-GROUP directory */
-	char *yfname;		/* /chunk-GROUP/NID file */
-	struct cldc_fh *yfh;	/* /chunk-GROUP/NID file */
 
 	struct list_head chunks;	/* found in xfname, struct chunk_node */
 };
 
 static int cldu_set_cldc(struct cld_session *sp, int newactive);
-static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static void try_open_x(struct cld_session *sp);
-static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static void next_chunk(struct cld_session *sp);
-static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
+static int scan_chunks(struct cld_session *sp);
+static void next_chunk(struct cld_session *sp, struct chunk_node *np);
 static void add_remote(const char *name);
 static void add_chunk_node(struct cld_session *sp, const char *name);
 
-static struct timeval cldu_retry_delay = { 5, 0 };
 static struct timeval cldu_rescan_delay = { 50, 0 };
-static struct timeval cldu_reopen_delay = { 3, 0 };
 
 struct hail_log cldu_hail_log = {
 	.func		= applog,
@@ -169,170 +147,68 @@ err_oom:
 	return 0;
 }
 
-static void cldu_tm_retry(int fd, short events, void *userdata)
-{
-	struct cld_session *sp = userdata;
-
-	if (++sp->retry_cnt >= 5) {
-		applog(LOG_INFO, "Out of retries for %s, bailing", sp->xfname);
-		exit(1);
-	}
-	if (debugging)
-		applog(LOG_DEBUG, "Trying to open %s", sp->xfname);
-	try_open_x(sp);
-}
-
 static void cldu_tm_rescan(int fd, short events, void *userdata)
 {
 	struct cld_session *sp = userdata;
+	int newactive;
 
 	/* Add rescanning for tabled nodes as well. FIXME */
 	if (debugging)
 		applog(LOG_DEBUG, "Rescanning for Chunks in %s", sp->xfname);
-	try_open_x(sp);
-}
-
-static void cldu_tm_reopen(int fd, short events, void *userdata)
-{
-	struct cld_session *sp = userdata;
-
-	if (debugging)
-		applog(LOG_DEBUG, "Trying to reopen %d storage nodes",
-		       tabled_srv.num_stor);
-	if (stor_update_cb() < 1)
-		evtimer_add(&sp->tm_reopen, &cldu_reopen_delay);
-}
 
-static void cldu_event(int fd, short events, void *userdata)
-{
-	struct cld_session *sp = userdata;
-	int rc;
-
-	if (!sp->lib) {
-		applog(LOG_WARNING, "Stray UDP event");
-		return;
-	}
-
-	rc = cldc_udp_receive_pkt(sp->lib);
-	if (rc) {
-		if (rc != sp->last_recv_err) {
-			if (rc < -1000)		/* our internal code */
-				applog(LOG_INFO,
-				       "cldc_udp_receive_pkt failed: %d", rc);
-			else
-				applog(LOG_INFO,
-				       "cldc_udp_receive_pkt failed: %s",
-				       strerror(-rc));
-			sp->last_recv_err = rc;
-		}
-		/*
-		 * Reacting to ICMP messages is a bad idea, because
-		 *  - it makes us loop hard in case CLD is down, unless we
-		 *    insert additional tricky timeouts
-		 *  - it deals poorly with transient problems like CLD reboots
-		 */
-#if 0
-		if (rc == -ECONNREFUSED) {	/* ICMP tells us */
-			int newactive;
-			// evtimer_del(&sp->tm);
-			cldc_kill_sess(sp->lib->sess);
-			sp->lib->sess = NULL;
-			newactive = cldu_nextactive(sp);
-			if (cldu_set_cldc(sp, newactive))
-				return;
-			// evtimer_add(&sp->tm, &cldc_to_delay);
+	if (sp->is_dead) {
+		ncld_sess_close(sp->nsp);
+		sp->nsp = NULL;
+		sp->is_dead = 0;
+		newactive = cldu_nextactive(sp);
+		if (cldu_set_cldc(sp, newactive)) {
+			evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
+			return;
 		}
-		return;
-#endif
-	}
-}
-
-static bool cldu_p_timer_ctl(void *priv, bool add,
-			     int (*cb)(struct cldc_session *, void *),
-			     void *cb_priv, time_t secs)
-{
-	struct cld_session *sp = priv;
-	struct cldc_udp *udp = sp->lib;
-	struct timeval tv = { secs, 0 };
-
-	if (add) {
-		udp->cb = cb;
-		udp->cb_private = cb_priv;
-		return evtimer_add(&sp->lib_timer, &tv) == 0;
-	} else {
-		return evtimer_del(&sp->lib_timer) == 0;
 	}
-}
 
-static int cldu_p_pkt_send(void *priv, const void *addr, size_t addrlen,
-			       const void *buf, size_t buflen)
-{
-	struct cld_session *sp = priv;
-	return cldc_udp_pkt_send(sp->lib, addr, addrlen, buf, buflen);
+	scan_chunks(sp);
+	evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
 }
 
-static void cldu_udp_timer_event(int fd, short events, void *userdata)
-
-{
-	struct cld_session *sp = userdata;
-	struct cldc_udp *udp = sp->lib;
-
-	if (udp->cb)
-		udp->cb(udp->sess, udp->cb_private);
-}
-
-static void cldu_p_event(void *priv, struct cldc_session *csp,
-			 struct cldc_fh *fh, uint32_t what)
+static void cldu_sess_event(void *priv, uint32_t what)
 {
 	struct cld_session *sp = priv;
-	int newactive;
 
 	if (what == CE_SESS_FAILED) {
-		sp->sess_open = false;
-		if (sp->lib->sess != csp)
-			applog(LOG_ERR, "Stray session failed, sid " SIDFMT,
-			       SIDARG(csp->sid));
-		else
-			applog(LOG_ERR, "Session failed, sid " SIDFMT,
-			       SIDARG(csp->sid));
-		// evtimer_del(&sp->tm);
-		sp->lib->sess = NULL;
-		newactive = cldu_nextactive(sp);
-		if (cldu_set_cldc(sp, newactive))
-			return;
-		// evtimer_add(&sp->tm, &cldc_to_delay);
+		applog(LOG_ERR, "Session failed, sid " SIDFMT,
+			       SIDARG(sp->nsp->udp->sess->sid));
+		sp->is_dead = true;
 	} else {
-		if (csp)
+		if (sp->nsp)
 			applog(LOG_INFO, "cldc event 0x%x sid " SIDFMT,
-			       what, SIDARG(csp->sid));
+			       what, SIDARG(sp->nsp->udp->sess->sid));
 		else
 			applog(LOG_INFO, "cldc event 0x%x no sid", what);
 	}
 }
 
-static struct cldc_ops cld_ops = {
-	.timer_ctl =	cldu_p_timer_ctl,
-	.pkt_send =	cldu_p_pkt_send,
-	.event =	cldu_p_event,
-	.errlog =	applog,
-};
-
 /*
- * Open the library, start its session, and reguster its socket with libevent.
+ * Open the library, start its session, pre-open files, and set timers.
  * Our session remains consistent in case of an error in this function,
  * so that we can continue and retry meaningfuly.
  */
 static int cldu_set_cldc(struct cld_session *sp, int newactive)
 {
 	struct cldc_host *hp;
-	struct cldc_udp *lib;
-	struct cldc_call_opts copts;
+	struct ncld_read *nrp;
+	char buf[100];
+	const char *ptr;
+	int dir_len;
+	int total_len, rec_len, name_len;
+	int len;
+	struct timespec tm;
+	int error;
 	int rc;
 
-	if (sp->lib) {
-		event_del(&sp->ev);
-		cldc_udp_free(sp->lib);
-		sp->lib = NULL;
+	if (sp->nsp) {
+		ncld_sess_close(sp->nsp);
+		sp->nsp = NULL;
 	}
 
 	sp->actx = newactive;
@@ -342,105 +218,36 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 	}
 	hp = &sp->cldv[sp->actx].h;
 
-	evtimer_set(&sp->lib_timer, cldu_udp_timer_event, sp);
-
-	rc = cldc_udp_new(hp->host, hp->port, &sp->lib);
-	if (rc) {
-		applog(LOG_ERR, "cldc_udp_new(%s,%u) error: %d",
-		       hp->host, hp->port, rc);
-		goto err_lib_new;
-	}
-	lib = sp->lib;
-
 	if (debugging)
 		applog(LOG_INFO, "Selected CLD host %s port %u",
 		       hp->host, hp->port);
 
-	/*
-	 * This is a little iffy: we assume that it's ok to re-issue
-	 * event_set() for an event that was unregistered with event_del().
-	 * In any case, there's no other way to set the file descriptor.
-	 */
-	event_set(&sp->ev, sp->lib->fd, EV_READ | EV_PERSIST, cldu_event, sp);
-
-	if (event_add(&sp->ev, NULL) < 0) {
-		applog(LOG_INFO, "Failed to add CLD event");
-		goto err_event;
-	}
-
-	memset(&copts, 0, sizeof(struct cldc_call_opts));
-	copts.cb = cldu_new_sess;
-	copts.private = sp;
-	rc = cldc_new_sess(&cld_ops, &copts, lib->addr, lib->addr_len,
-			   "tabled", "tabled", sp, &lib->sess);
-	if (rc) {
-		applog(LOG_INFO,
-		       "Failed to start CLD session on host %s port %u",
-		       hp->host, hp->port);
-		goto err_sess;
-	}
-
-	// if (debugging)
-	//	lib->sess->verbose = true;
-
-	return 0;
-
-err_sess:
-err_event:
-	cldc_udp_free(sp->lib);
-	sp->lib = NULL;
-err_lib_new:
-err_addr:
-	return -1;
-}
-
-static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_INFO, "New CLD session creation failed: %d", errc);
-		return 0;
+	sp->nsp = ncld_sess_open(hp->host, hp->port, &error,
+				 cldu_sess_event, sp, "tabled", "tabled");
+	if (sp->nsp == NULL) {
+		if (error < 1000) {
+			applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %s",
+			       hp->host, hp->port, strerror(error));
+		} else {
+			applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %d",
+			       hp->host, hp->port, error);
+		}
+		goto err_nsess;
 	}
 
-	sp->sess_open = true;
 	applog(LOG_INFO, "New CLD session created, sid " SIDFMT,
-	       SIDARG(sp->lib->sess->sid));
+	       SIDARG(sp->nsp->udp->sess->sid));
 
 	/*
 	 * First, make sure the base directory exists.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_c_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->cfname,
-		       COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->cfh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->cfname, rc);
-	}
-	return 0;
-}
-
-static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, errc);
-		return 0;
-	}
-	if (sp->cfh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->cfname);
-		return 0;
-	}
-	if (!sp->cfh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->cfname);
-		return 0;
+	sp->cfh = ncld_open(sp->nsp, sp->cfname,
+			    COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY,
+			    &error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+			    NULL, NULL);
+	if (!sp->cfh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, error);
+		goto err_copen;
 	}
 
 	if (debugging)
@@ -449,65 +256,38 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 	/*
 	 * Then, create the membership file for us.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_f_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->ffname,
-		       COM_WRITE | COM_LOCK | COM_CREATE,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->ffh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->ffname, rc);
-	}
-	return 0;
-}
-
-static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, errc);
-		return 0;
-	}
-	if (sp->ffh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->ffname);
-		return 0;
-	}
-	if (!sp->ffh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->ffname);
-		return 0;
+	sp->ffh = ncld_open(sp->nsp, sp->ffname,
+			    COM_WRITE | COM_LOCK | COM_CREATE,
+			    &error, 0, NULL, NULL);
+	if (!sp->ffh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, error);
+		goto err_fopen;
 	}
 
 	if (debugging)
 		applog(LOG_DEBUG, "CLD file \"%s\" created", sp->ffname);
 
-	/*
-	 * Lock the file, in case two hosts got the same hostname.
-	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_lock_cb;
-	copts.private = sp;
-	rc = cldc_lock(sp->ffh, &copts, 0, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_lock call error %d", rc);
-	}
+	for (;;) {
+		rc = ncld_trylock(sp->ffh);
+		if (!rc)
+			break;
 
-	return 0;
-}
+		applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, rc);
+		if (rc != CLE_LOCK_CONFLICT + 1100)
+			goto err_lock;
 
-static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	char buf[100];
-	int len;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, errc);
-		return 0;
+		/*
+		 * The usual reason why we get a lock conflict is
+		 * restarting too quickly and hitting the previous lock
+		 * that is going to disappear soon.
+		 *
+		 * FIXME: However, it may also be that a master
+		 * is ok we we should become a slave, e.g. start TDB.
+		 * We do not support multi-node, but we should.
+		 */
+		tm.tv_sec = 10;
+		tm.tv_nsec = 0;
+		nanosleep(&tm, NULL);
 	}
 
 	/*
@@ -515,65 +295,30 @@ static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 	 */
 	len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
 	if (len >= sizeof(buf)) {
-		applog(LOG_ERR,
-		       "internal error: overflow in cldu_lock_cb (%d)", len);
-		return 0;
+		applog(LOG_ERR, "internal error: overflow for port (%d)", len);
+		goto err_wmem;
 	}
 
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_put_cb;
-	copts.private = sp;
-	rc = cldc_put(sp->ffh, &copts, buf, len);
+	rc = ncld_write(sp->ffh, buf, len);
 	if (rc) {
-		applog(LOG_ERR, "cldc_put(%s) call error: %d", sp->ffname, rc);
-	}
-
-	return 0;
-}
-
-static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, errc);
-		return 0;
+		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
+		goto err_write;
 	}
 
 	/*
 	 * Read the directory.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_get_1_cb;
-	copts.private = sp;
-	rc = cldc_get(sp->cfh, &copts, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc);
-	}
-
-	return 0;
-}
-
-static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	const char *ptr;
-	int dir_len;
-	int total_len, rec_len, name_len;
-	char buf[65];
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, errc);
-		return 0;
+	nrp = ncld_get(sp->cfh, &error);
+	if (!nrp) {
+		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error);
+		goto err_dread;
 	}
 
 	if (debugging)
 		applog(LOG_DEBUG, "Known tabled nodes");
 
-	ptr = carg->u.get.buf;
-	dir_len = carg->u.get.size;
+	ptr = nrp->ptr;
+	dir_len = nrp->length;
 	while (dir_len) {
 		name_len = GUINT16_FROM_LE(*(uint16_t *)ptr);
 		rec_len = name_len + 2;
@@ -598,6 +343,8 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 		dir_len -= total_len;
 	}
 
+	ncld_read_free(nrp);
+
 	/*
 	 * If configuration gives us storage nodes, we shortcut scanning
 	 * of CLD, because:
@@ -609,96 +356,78 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 		if (debugging)
 			applog(LOG_DEBUG, "Trying to open %d storage nodes",
 			       tabled_srv.num_stor);
-		if (stor_update_cb() < 1) {
-			evtimer_add(&sp->tm_reopen, &cldu_reopen_delay);
+		while (stor_update_cb() < 1) {
+			tm.tv_sec = 3;
+			tm.tv_nsec = 0;
+			nanosleep(&tm, NULL);
+			if (debugging)
+				applog(LOG_DEBUG,
+				       "Trying to reopen %d storage nodes",
+				       tabled_srv.num_stor);
 		}
 		return 0;
 	}
 
-	sp->retry_cnt = 0;
-	try_open_x(sp);
 	return 0;
-}
-
-/*
- * Open the xfname, so we can collect registered Chunk servers.
- */
-static void try_open_x(struct cld_session *sp)
-{
-	struct cldc_call_opts copts;
-	int rc;
 
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_x_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->xfname,
-		       COM_READ | COM_DIRECTORY,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->xfh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->xfname, rc);
-	}
+err_dread:
+err_write:
+err_wmem:
+err_lock:
+	ncld_close(sp->ffh);	/* session-close closes these, maybe drop */
+err_fopen:
+	ncld_close(sp->cfh);
+err_copen:
+	ncld_sess_close(sp->nsp);
+	sp->nsp = NULL;
+err_nsess:
+err_addr:
+	return -1;
 }
 
-static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+static int scan_chunks(struct cld_session *sp)
 {
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		if (errc == CLE_INODE_INVAL || errc == CLE_NAME_INVAL) {
+	struct ncld_fh *xfh;	/* /chunk-GROUP directory */
+	struct ncld_read *nrp;
+	struct chunk_node *np;
+	const char *ptr;
+	int dir_len;
+	int total_len, rec_len, name_len;
+	char buf[65];
+	int error;
+
+	xfh = ncld_open(sp->nsp, sp->xfname, COM_READ | COM_DIRECTORY,
+			&error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+			NULL, NULL);
+	if (!xfh) {
+		if (error == CLE_INODE_INVAL + 1100 ||
+		    error == CLE_NAME_INVAL + 1100) {
 			applog(LOG_ERR, "%s: open failed, retrying",
 			       sp->xfname);
-			evtimer_add(&sp->tm_retry, &cldu_retry_delay);
+			return 1;
 		} else {
 			applog(LOG_ERR, "CLD open(%s) failed: %d",
-			       sp->xfname, errc);
+			       sp->xfname, error);
 			/* XXX we're dead, why not exit(1) right away? */
+			return -1;
 		}
-		return 0;
-	}
-	if (sp->xfh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->xfname);
-		return 0;
-	}
-	if (!sp->xfh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->xfname);
-		return 0;
 	}
 
 	/*
 	 * Read the directory.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_get_x_cb;
-	copts.private = sp;
-	rc = cldc_get(sp->xfh, &copts, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc);
-	}
-	return 0;
-}
-
-static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-	const char *ptr;
-	int dir_len;
-	int total_len, rec_len, name_len;
-	char buf[65];
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, errc);
-		return 0;
+	nrp = ncld_get(xfh, &error);
+	if (!nrp) {
+		ncld_close(xfh);
+		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, error);
+		return -1;
 	}
 
 	if (debugging)
 		applog(LOG_DEBUG, "Known Chunk nodes");
 
-	ptr = carg->u.get.buf;
-	dir_len = carg->u.get.size;
+	ptr = nrp->ptr;
+	dir_len = nrp->length;
 	while (dir_len) {
 		name_len = GUINT16_FROM_LE(*(uint16_t *)ptr);
 		rec_len = name_len + 2;
@@ -718,176 +447,82 @@ static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 		dir_len -= total_len;
 	}
 
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_close_x_cb;
-	copts.private = sp;
-	rc = cldc_close(sp->xfh, &copts);
-	if (rc) {
-		applog(LOG_ERR, "cldc_close call error %d", rc);
-	}
-	return 0;
-}
-
-static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	// struct cldc_call_opts copts;
-	// int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD close(%s) failed: %d", sp->xfname, errc);
-		return 0;
-	}
+	ncld_read_free(nrp);
+	ncld_close(xfh);
 
+	/*
+	 * Scan the collected directory contents and fill the entries.
+	 */
 	if (list_empty(&sp->chunks)) {
 		applog(LOG_INFO, "%s: No Chunk nodes found, retrying",
 		       sp->xfname);
-		if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) {
-			applog(LOG_ERR, "evtimer_add error %s",
-			       strerror(errno));
-		}
-	} else {
-		next_chunk(sp);
+		return 1;
 	}
-	return 0;
-}
-
-static void next_chunk(struct cld_session *sp)
-{
-	struct chunk_node *np;
-	char *mem;
-	struct cldc_call_opts copts;
-	int rc;
-
-	np = list_entry(sp->chunks.next, struct chunk_node, link);
-
-	if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) {
-		applog(LOG_WARNING, "OOM in cldu");
-		return;
-	}
-	sp->yfname = mem;
-
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_y_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->yfname,
-		       COM_READ,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->yfh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->yfname, rc);
-	}
-}
-
-static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->yfname, errc);
-		free(sp->yfname);
-		sp->yfname = NULL;
-		return 0;
-	}
-	if (sp->yfh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->yfname);
-		free(sp->yfname);
-		sp->yfname = NULL;
-		return 0;
-	}
-	if (!sp->yfh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->yfname);
-		free(sp->yfname);
-		sp->yfname = NULL;
-		return 0;
+	while (!list_empty(&sp->chunks)) {
+		np = list_entry(sp->chunks.next, struct chunk_node, link);
+		next_chunk(sp, np);
+		list_del(&np->link);
 	}
 
 	/*
-	 * Read the Chunk's parameter file.
+	 * Poke the dispatch about the possible changes in the
+	 * configuration of Chunk.
+	 *
+	 * It's possible that the CLD directories have many entries,
+	 * but no useable Chunk servers. In that case, treat everything
+	 * like a usual retry.
+	 *
+	 * For the case of normal operation, we also set up a rescan, for now.
+	 * In the future, we'll subscribe for change notification. FIXME.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_get_y_cb;
-	copts.private = sp;
-	rc = cldc_get(sp->yfh, &copts, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->yfname, rc);
-	}
-	return 0;
-}
+	if (!stor_update_cb())
+		return 1;
 
-static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-	const char *ptr;
-	int len;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->yfname, errc);
-		goto close_and_next;	/* spaghetti */
-	}
-
-	ptr = carg->u.get.buf;
-	len = carg->u.get.size;
-	stor_parse(sp->yfname, ptr, len);
-
-close_and_next:
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_close_y_cb;
-	copts.private = sp;
-	rc = cldc_close(sp->yfh, &copts);
-	if (rc) {
-		applog(LOG_ERR, "cldc_close call error %d", rc);
-	}
 	return 0;
 }
 
-static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+static void next_chunk(struct cld_session *sp, struct chunk_node *np)
 {
-	struct cld_session *sp = carg->private;
-	struct chunk_node *np;
-	// struct cldc_call_opts copts;
-	// int rc;
+	char *mem;
+	char *yfname;		/* /chunk-GROUP/NID file */
+	struct ncld_fh *yfh;	/* /chunk-GROUP/NID file */
+	struct ncld_read *nrp;
+	int error;
 
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD close(%s) failed: %d", sp->yfname, errc);
-		return 0;
+	if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) {
+		applog(LOG_WARNING, "OOM in cldu");
+		goto err_mem;
 	}
+	yfname = mem;
 
-	free(sp->yfname);
-	sp->yfname = NULL;
-
-	np = list_entry(sp->chunks.next, struct chunk_node, link);
-	list_del(&np->link);
-
-	if (!list_empty(&sp->chunks)) {
-		next_chunk(sp);
-		return 0;
+	yfh = ncld_open(sp->nsp, yfname, COM_READ, &error,
+			0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+			NULL, NULL);
+	if (!yfh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", yfname, error);
+		goto err_open;
 	}
 
 	/*
-	 * No more chunks to consider in this cycle, we're all done.
-	 * Now, poke the dispatch about the possible changes in the
-	 * configuration of Chunk.
-	 *
-	 * It's possible that the CLD directories are full of all garbage,
-	 * but no useable Chunk servers. In that case, treat everything
-	 * like a usual retry.
-	 *
-	 * For the case of normal operation, we also set up a rescan, for now.
-	 * In the future, we'll subscribe for change notification. FIXME.
+	 * Read the Chunk's parameter file.
 	 */
-	if (stor_update_cb()) {
-		evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
-	} else {
-		if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) {
-			applog(LOG_ERR, "evtimer_add error %s",
-			       strerror(errno));
-		}
-	}
-	return 0;
+	nrp = ncld_get(yfh, &error);
+	if (!nrp) {
+		applog(LOG_ERR, "CLD get(%s) failed: %d", yfname, error);
+		goto err_get;
+	}
+	stor_parse(yfname, nrp->ptr, nrp->length);
+	ncld_read_free(nrp);
+	ncld_close(yfh);
+	free(yfname);
+	return;
+
+err_get:
+	ncld_close(yfh);
+err_open:
+	free(yfname);
+err_mem:
+	return;
 }
 
 /*
@@ -937,7 +572,7 @@ static struct cld_session ses;
  */
 void cld_init()
 {
-	cldc_init();
+	ncld_init();
 
 	// memset(&ses, 0, sizeof(struct cld_session));
 	INIT_LIST_HEAD(&ses.chunks);
@@ -949,10 +584,10 @@ void cld_init()
 int cld_begin(const char *thishost, const char *thisgroup)
 {
 	static struct cld_session *sp = &ses;
+	struct timespec tm;
+	int retry_cnt;
 
-	evtimer_set(&ses.tm_retry, cldu_tm_retry, &ses);
 	evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses);
-	evtimer_set(&ses.tm_reopen, cldu_tm_reopen, &ses);
 
 	if (cldu_setgroup(sp, thisgroup, thishost)) {
 		/* Already logged error */
@@ -999,8 +634,21 @@ int cld_begin(const char *thishost, const char *thisgroup)
 		goto err_net;
 	}
 
+	retry_cnt = 0;
+	for (;;) {
+		if (!scan_chunks(sp))
+			break;
+		if (++retry_cnt == 5)
+			goto err_scan;
+		tm.tv_sec = 5;
+		tm.tv_nsec = 0;
+		nanosleep(&tm, NULL);
+	}
+
+	evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
 	return 0;
 
+err_scan:
 err_net:
 err_addr:
 err_group:
@@ -1034,12 +682,9 @@ void cld_end(void)
 	static struct cld_session *sp = &ses;
 	int i;
 
-	if (sp->lib) {
-		event_del(&sp->ev);
-		// if (sp->sess_open)	/* kill it always, include half-open */
-		cldc_kill_sess(sp->lib->sess);
-		cldc_udp_free(sp->lib);
-		sp->lib = NULL;
+	if (sp->nsp) {
+		ncld_sess_close(sp->nsp);
+		sp->nsp = NULL;
 	}
 
 	if (!sp->forced_hosts) {
@@ -1051,9 +696,7 @@ void cld_end(void)
 		}
 	}
 
-	evtimer_del(&sp->tm_retry);
 	evtimer_del(&sp->tm_rescan);
-	evtimer_del(&sp->tm_reopen);
 
 	free(sp->cfname);
 	sp->cfname = NULL;
@@ -1061,8 +704,6 @@ void cld_end(void)
 	sp->ffname = NULL;
 	free(sp->xfname);
 	sp->xfname = NULL;
-	free(sp->yfname);
-	sp->yfname = NULL;
 	free(sp->thisgroup);
 	sp->thisgroup = NULL;
 	free(sp->thishost);
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux