No new function just yet, only a switch-over. Signed-Off-By: Pete Zaitcev <zaitcev@xxxxxxxxxx> --- server/cldu.c | 789 +++++++++++++----------------------------------- 1 file changed, 215 insertions(+), 574 deletions(-) diff --git a/server/cldu.c b/server/cldu.c index 7e176d4..aecf336 100644 --- a/server/cldu.c +++ b/server/cldu.c @@ -29,8 +29,8 @@ #include <unistd.h> #include <event.h> #include <errno.h> -#include <cldc.h> #include <elist.h> +#include <ncld.h> #include "tabled.h" #define ALIGN8(n) ((8 - ((n) & 7)) & 7) @@ -49,60 +49,38 @@ struct cld_host { struct cld_session { bool forced_hosts; /* Administrator overrode default CLD */ - bool sess_open; - struct cldc_udp *lib; /* library state */ - struct event lib_timer; - int retry_cnt; - int last_recv_err; + bool is_dead; + struct ncld_sess *nsp; /* library state */ /* * For code sanity and being isomorphic with conventional programming * using sleep(), neither of the timers must ever be active simultane- * ously with any other. But using one timer structure is too annoying. */ - struct event tm_retry; + // struct event tm_relock; struct event tm_rescan; - struct event tm_reopen; int actx; /* Active host cldv[actx] */ struct cld_host cldv[N_CLD]; char *thisgroup; char *thishost; - struct event ev; /* Associated with fd */ char *cfname; /* /tabled-group directory */ - struct cldc_fh *cfh; /* /tabled-group directory, keep open for scan */ + struct ncld_fh *cfh; /* /tabled-group directory, keep open for scan */ char *ffname; /* /tabled-group/thishost */ - struct cldc_fh *ffh; /* /tabled-group/thishost, keep open for lock */ + struct ncld_fh *ffh; /* /tabled-group/thishost, keep open for lock */ char *xfname; /* /chunk-GROUP directory */ - struct cldc_fh *xfh; /* /chunk-GROUP directory */ - char *yfname; /* /chunk-GROUP/NID file */ - struct cldc_fh *yfh; /* /chunk-GROUP/NID file */ struct list_head chunks; /* found in xfname, struct chunk_node */ }; static int cldu_set_cldc(struct cld_session *sp, int newactive); -static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static void try_open_x(struct cld_session *sp); -static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static void next_chunk(struct cld_session *sp); -static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); -static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc); +static int scan_chunks(struct cld_session *sp); +static void next_chunk(struct cld_session *sp, struct chunk_node *np); static void add_remote(const char *name); static void add_chunk_node(struct cld_session *sp, const char *name); -static struct timeval cldu_retry_delay = { 5, 0 }; static struct timeval cldu_rescan_delay = { 50, 0 }; -static struct timeval cldu_reopen_delay = { 3, 0 }; struct hail_log cldu_hail_log = { .func = applog, @@ -169,170 +147,68 @@ err_oom: return 0; } -static void cldu_tm_retry(int fd, short events, void *userdata) -{ - struct cld_session *sp = userdata; - - if (++sp->retry_cnt >= 5) { - applog(LOG_INFO, "Out of retries for %s, bailing", sp->xfname); - exit(1); - } - if (debugging) - applog(LOG_DEBUG, "Trying to open %s", sp->xfname); - try_open_x(sp); -} - static void cldu_tm_rescan(int fd, short events, void *userdata) { struct cld_session *sp = userdata; + int newactive; /* Add rescanning for tabled nodes as well. FIXME */ if (debugging) applog(LOG_DEBUG, "Rescanning for Chunks in %s", sp->xfname); - try_open_x(sp); -} - -static void cldu_tm_reopen(int fd, short events, void *userdata) -{ - struct cld_session *sp = userdata; - - if (debugging) - applog(LOG_DEBUG, "Trying to reopen %d storage nodes", - tabled_srv.num_stor); - if (stor_update_cb() < 1) - evtimer_add(&sp->tm_reopen, &cldu_reopen_delay); -} -static void cldu_event(int fd, short events, void *userdata) -{ - struct cld_session *sp = userdata; - int rc; - - if (!sp->lib) { - applog(LOG_WARNING, "Stray UDP event"); - return; - } - - rc = cldc_udp_receive_pkt(sp->lib); - if (rc) { - if (rc != sp->last_recv_err) { - if (rc < -1000) /* our internal code */ - applog(LOG_INFO, - "cldc_udp_receive_pkt failed: %d", rc); - else - applog(LOG_INFO, - "cldc_udp_receive_pkt failed: %s", - strerror(-rc)); - sp->last_recv_err = rc; - } - /* - * Reacting to ICMP messages is a bad idea, because - * - it makes us loop hard in case CLD is down, unless we - * insert additional tricky timeouts - * - it deals poorly with transient problems like CLD reboots - */ -#if 0 - if (rc == -ECONNREFUSED) { /* ICMP tells us */ - int newactive; - // evtimer_del(&sp->tm); - cldc_kill_sess(sp->lib->sess); - sp->lib->sess = NULL; - newactive = cldu_nextactive(sp); - if (cldu_set_cldc(sp, newactive)) - return; - // evtimer_add(&sp->tm, &cldc_to_delay); + if (sp->is_dead) { + ncld_sess_close(sp->nsp); + sp->nsp = NULL; + sp->is_dead = 0; + newactive = cldu_nextactive(sp); + if (cldu_set_cldc(sp, newactive)) { + evtimer_add(&sp->tm_rescan, &cldu_rescan_delay); + return; } - return; -#endif - } -} - -static bool cldu_p_timer_ctl(void *priv, bool add, - int (*cb)(struct cldc_session *, void *), - void *cb_priv, time_t secs) -{ - struct cld_session *sp = priv; - struct cldc_udp *udp = sp->lib; - struct timeval tv = { secs, 0 }; - - if (add) { - udp->cb = cb; - udp->cb_private = cb_priv; - return evtimer_add(&sp->lib_timer, &tv) == 0; - } else { - return evtimer_del(&sp->lib_timer) == 0; } -} -static int cldu_p_pkt_send(void *priv, const void *addr, size_t addrlen, - const void *buf, size_t buflen) -{ - struct cld_session *sp = priv; - return cldc_udp_pkt_send(sp->lib, addr, addrlen, buf, buflen); + scan_chunks(sp); + evtimer_add(&sp->tm_rescan, &cldu_rescan_delay); } -static void cldu_udp_timer_event(int fd, short events, void *userdata) - -{ - struct cld_session *sp = userdata; - struct cldc_udp *udp = sp->lib; - - if (udp->cb) - udp->cb(udp->sess, udp->cb_private); -} - -static void cldu_p_event(void *priv, struct cldc_session *csp, - struct cldc_fh *fh, uint32_t what) +static void cldu_sess_event(void *priv, uint32_t what) { struct cld_session *sp = priv; - int newactive; if (what == CE_SESS_FAILED) { - sp->sess_open = false; - if (sp->lib->sess != csp) - applog(LOG_ERR, "Stray session failed, sid " SIDFMT, - SIDARG(csp->sid)); - else - applog(LOG_ERR, "Session failed, sid " SIDFMT, - SIDARG(csp->sid)); - // evtimer_del(&sp->tm); - sp->lib->sess = NULL; - newactive = cldu_nextactive(sp); - if (cldu_set_cldc(sp, newactive)) - return; - // evtimer_add(&sp->tm, &cldc_to_delay); + applog(LOG_ERR, "Session failed, sid " SIDFMT, + SIDARG(sp->nsp->udp->sess->sid)); + sp->is_dead = true; } else { - if (csp) + if (sp->nsp) applog(LOG_INFO, "cldc event 0x%x sid " SIDFMT, - what, SIDARG(csp->sid)); + what, SIDARG(sp->nsp->udp->sess->sid)); else applog(LOG_INFO, "cldc event 0x%x no sid", what); } } -static struct cldc_ops cld_ops = { - .timer_ctl = cldu_p_timer_ctl, - .pkt_send = cldu_p_pkt_send, - .event = cldu_p_event, - .errlog = applog, -}; - /* - * Open the library, start its session, and reguster its socket with libevent. + * Open the library, start its session, pre-open files, and set timers. * Our session remains consistent in case of an error in this function, * so that we can continue and retry meaningfuly. */ static int cldu_set_cldc(struct cld_session *sp, int newactive) { struct cldc_host *hp; - struct cldc_udp *lib; - struct cldc_call_opts copts; + struct ncld_read *nrp; + char buf[100]; + const char *ptr; + int dir_len; + int total_len, rec_len, name_len; + int len; + struct timespec tm; + int error; int rc; - if (sp->lib) { - event_del(&sp->ev); - cldc_udp_free(sp->lib); - sp->lib = NULL; + if (sp->nsp) { + ncld_sess_close(sp->nsp); + sp->nsp = NULL; } sp->actx = newactive; @@ -342,105 +218,36 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive) } hp = &sp->cldv[sp->actx].h; - evtimer_set(&sp->lib_timer, cldu_udp_timer_event, sp); - - rc = cldc_udp_new(hp->host, hp->port, &sp->lib); - if (rc) { - applog(LOG_ERR, "cldc_udp_new(%s,%u) error: %d", - hp->host, hp->port, rc); - goto err_lib_new; - } - lib = sp->lib; - if (debugging) applog(LOG_INFO, "Selected CLD host %s port %u", hp->host, hp->port); - /* - * This is a little iffy: we assume that it's ok to re-issue - * event_set() for an event that was unregistered with event_del(). - * In any case, there's no other way to set the file descriptor. - */ - event_set(&sp->ev, sp->lib->fd, EV_READ | EV_PERSIST, cldu_event, sp); - - if (event_add(&sp->ev, NULL) < 0) { - applog(LOG_INFO, "Failed to add CLD event"); - goto err_event; - } - - memset(&copts, 0, sizeof(struct cldc_call_opts)); - copts.cb = cldu_new_sess; - copts.private = sp; - rc = cldc_new_sess(&cld_ops, &copts, lib->addr, lib->addr_len, - "tabled", "tabled", sp, &lib->sess); - if (rc) { - applog(LOG_INFO, - "Failed to start CLD session on host %s port %u", - hp->host, hp->port); - goto err_sess; - } - - // if (debugging) - // lib->sess->verbose = true; - - return 0; - -err_sess: -err_event: - cldc_udp_free(sp->lib); - sp->lib = NULL; -err_lib_new: -err_addr: - return -1; -} - -static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - applog(LOG_INFO, "New CLD session creation failed: %d", errc); - return 0; + sp->nsp = ncld_sess_open(hp->host, hp->port, &error, + cldu_sess_event, sp, "tabled", "tabled"); + if (sp->nsp == NULL) { + if (error < 1000) { + applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %s", + hp->host, hp->port, strerror(error)); + } else { + applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %d", + hp->host, hp->port, error); + } + goto err_nsess; } - sp->sess_open = true; applog(LOG_INFO, "New CLD session created, sid " SIDFMT, - SIDARG(sp->lib->sess->sid)); + SIDARG(sp->nsp->udp->sess->sid)); /* * First, make sure the base directory exists. */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_open_c_cb; - copts.private = sp; - rc = cldc_open(sp->lib->sess, &copts, sp->cfname, - COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY, - CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->cfh); - if (rc) { - applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->cfname, rc); - } - return 0; -} - -static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, errc); - return 0; - } - if (sp->cfh == NULL) { - applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->cfname); - return 0; - } - if (!sp->cfh->valid) { - applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->cfname); - return 0; + sp->cfh = ncld_open(sp->nsp, sp->cfname, + COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY, + &error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */, + NULL, NULL); + if (!sp->cfh) { + applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, error); + goto err_copen; } if (debugging) @@ -449,65 +256,38 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) /* * Then, create the membership file for us. */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_open_f_cb; - copts.private = sp; - rc = cldc_open(sp->lib->sess, &copts, sp->ffname, - COM_WRITE | COM_LOCK | COM_CREATE, - CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->ffh); - if (rc) { - applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->ffname, rc); - } - return 0; -} - -static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, errc); - return 0; - } - if (sp->ffh == NULL) { - applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->ffname); - return 0; - } - if (!sp->ffh->valid) { - applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->ffname); - return 0; + sp->ffh = ncld_open(sp->nsp, sp->ffname, + COM_WRITE | COM_LOCK | COM_CREATE, + &error, 0, NULL, NULL); + if (!sp->ffh) { + applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, error); + goto err_fopen; } if (debugging) applog(LOG_DEBUG, "CLD file \"%s\" created", sp->ffname); - /* - * Lock the file, in case two hosts got the same hostname. - */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_lock_cb; - copts.private = sp; - rc = cldc_lock(sp->ffh, &copts, 0, false); - if (rc) { - applog(LOG_ERR, "cldc_lock call error %d", rc); - } + for (;;) { + rc = ncld_trylock(sp->ffh); + if (!rc) + break; - return 0; -} + applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, rc); + if (rc != CLE_LOCK_CONFLICT + 1100) + goto err_lock; -static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - char buf[100]; - int len; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, errc); - return 0; + /* + * The usual reason why we get a lock conflict is + * restarting too quickly and hitting the previous lock + * that is going to disappear soon. + * + * FIXME: However, it may also be that a master + * is ok we we should become a slave, e.g. start TDB. + * We do not support multi-node, but we should. + */ + tm.tv_sec = 10; + tm.tv_nsec = 0; + nanosleep(&tm, NULL); } /* @@ -515,65 +295,30 @@ static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) */ len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port); if (len >= sizeof(buf)) { - applog(LOG_ERR, - "internal error: overflow in cldu_lock_cb (%d)", len); - return 0; + applog(LOG_ERR, "internal error: overflow for port (%d)", len); + goto err_wmem; } - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_put_cb; - copts.private = sp; - rc = cldc_put(sp->ffh, &copts, buf, len); + rc = ncld_write(sp->ffh, buf, len); if (rc) { - applog(LOG_ERR, "cldc_put(%s) call error: %d", sp->ffname, rc); - } - - return 0; -} - -static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, errc); - return 0; + applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc); + goto err_write; } /* * Read the directory. */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_get_1_cb; - copts.private = sp; - rc = cldc_get(sp->cfh, &copts, false); - if (rc) { - applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc); - } - - return 0; -} - -static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - const char *ptr; - int dir_len; - int total_len, rec_len, name_len; - char buf[65]; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, errc); - return 0; + nrp = ncld_get(sp->cfh, &error); + if (!nrp) { + applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error); + goto err_dread; } if (debugging) applog(LOG_DEBUG, "Known tabled nodes"); - ptr = carg->u.get.buf; - dir_len = carg->u.get.size; + ptr = nrp->ptr; + dir_len = nrp->length; while (dir_len) { name_len = GUINT16_FROM_LE(*(uint16_t *)ptr); rec_len = name_len + 2; @@ -598,6 +343,8 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) dir_len -= total_len; } + ncld_read_free(nrp); + /* * If configuration gives us storage nodes, we shortcut scanning * of CLD, because: @@ -609,96 +356,78 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) if (debugging) applog(LOG_DEBUG, "Trying to open %d storage nodes", tabled_srv.num_stor); - if (stor_update_cb() < 1) { - evtimer_add(&sp->tm_reopen, &cldu_reopen_delay); + while (stor_update_cb() < 1) { + tm.tv_sec = 3; + tm.tv_nsec = 0; + nanosleep(&tm, NULL); + if (debugging) + applog(LOG_DEBUG, + "Trying to reopen %d storage nodes", + tabled_srv.num_stor); } return 0; } - sp->retry_cnt = 0; - try_open_x(sp); return 0; -} - -/* - * Open the xfname, so we can collect registered Chunk servers. - */ -static void try_open_x(struct cld_session *sp) -{ - struct cldc_call_opts copts; - int rc; - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_open_x_cb; - copts.private = sp; - rc = cldc_open(sp->lib->sess, &copts, sp->xfname, - COM_READ | COM_DIRECTORY, - CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->xfh); - if (rc) { - applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->xfname, rc); - } +err_dread: +err_write: +err_wmem: +err_lock: + ncld_close(sp->ffh); /* session-close closes these, maybe drop */ +err_fopen: + ncld_close(sp->cfh); +err_copen: + ncld_sess_close(sp->nsp); + sp->nsp = NULL; +err_nsess: +err_addr: + return -1; } -static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +static int scan_chunks(struct cld_session *sp) { - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - if (errc == CLE_INODE_INVAL || errc == CLE_NAME_INVAL) { + struct ncld_fh *xfh; /* /chunk-GROUP directory */ + struct ncld_read *nrp; + struct chunk_node *np; + const char *ptr; + int dir_len; + int total_len, rec_len, name_len; + char buf[65]; + int error; + + xfh = ncld_open(sp->nsp, sp->xfname, COM_READ | COM_DIRECTORY, + &error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */, + NULL, NULL); + if (!xfh) { + if (error == CLE_INODE_INVAL + 1100 || + error == CLE_NAME_INVAL + 1100) { applog(LOG_ERR, "%s: open failed, retrying", sp->xfname); - evtimer_add(&sp->tm_retry, &cldu_retry_delay); + return 1; } else { applog(LOG_ERR, "CLD open(%s) failed: %d", - sp->xfname, errc); + sp->xfname, error); /* XXX we're dead, why not exit(1) right away? */ + return -1; } - return 0; - } - if (sp->xfh == NULL) { - applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->xfname); - return 0; - } - if (!sp->xfh->valid) { - applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->xfname); - return 0; } /* * Read the directory. */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_get_x_cb; - copts.private = sp; - rc = cldc_get(sp->xfh, &copts, false); - if (rc) { - applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc); - } - return 0; -} - -static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - const char *ptr; - int dir_len; - int total_len, rec_len, name_len; - char buf[65]; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, errc); - return 0; + nrp = ncld_get(xfh, &error); + if (!nrp) { + ncld_close(xfh); + applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, error); + return -1; } if (debugging) applog(LOG_DEBUG, "Known Chunk nodes"); - ptr = carg->u.get.buf; - dir_len = carg->u.get.size; + ptr = nrp->ptr; + dir_len = nrp->length; while (dir_len) { name_len = GUINT16_FROM_LE(*(uint16_t *)ptr); rec_len = name_len + 2; @@ -718,176 +447,82 @@ static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) dir_len -= total_len; } - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_close_x_cb; - copts.private = sp; - rc = cldc_close(sp->xfh, &copts); - if (rc) { - applog(LOG_ERR, "cldc_close call error %d", rc); - } - return 0; -} - -static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - // struct cldc_call_opts copts; - // int rc; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD close(%s) failed: %d", sp->xfname, errc); - return 0; - } + ncld_read_free(nrp); + ncld_close(xfh); + /* + * Scan the collected directory contents and fill the entries. + */ if (list_empty(&sp->chunks)) { applog(LOG_INFO, "%s: No Chunk nodes found, retrying", sp->xfname); - if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) { - applog(LOG_ERR, "evtimer_add error %s", - strerror(errno)); - } - } else { - next_chunk(sp); + return 1; } - return 0; -} - -static void next_chunk(struct cld_session *sp) -{ - struct chunk_node *np; - char *mem; - struct cldc_call_opts copts; - int rc; - - np = list_entry(sp->chunks.next, struct chunk_node, link); - - if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) { - applog(LOG_WARNING, "OOM in cldu"); - return; - } - sp->yfname = mem; - - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_open_y_cb; - copts.private = sp; - rc = cldc_open(sp->lib->sess, &copts, sp->yfname, - COM_READ, - CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->yfh); - if (rc) { - applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->yfname, rc); - } -} - -static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD open(%s) failed: %d", sp->yfname, errc); - free(sp->yfname); - sp->yfname = NULL; - return 0; - } - if (sp->yfh == NULL) { - applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->yfname); - free(sp->yfname); - sp->yfname = NULL; - return 0; - } - if (!sp->yfh->valid) { - applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->yfname); - free(sp->yfname); - sp->yfname = NULL; - return 0; + while (!list_empty(&sp->chunks)) { + np = list_entry(sp->chunks.next, struct chunk_node, link); + next_chunk(sp, np); + list_del(&np->link); } /* - * Read the Chunk's parameter file. + * Poke the dispatch about the possible changes in the + * configuration of Chunk. + * + * It's possible that the CLD directories have many entries, + * but no useable Chunk servers. In that case, treat everything + * like a usual retry. + * + * For the case of normal operation, we also set up a rescan, for now. + * In the future, we'll subscribe for change notification. FIXME. */ - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_get_y_cb; - copts.private = sp; - rc = cldc_get(sp->yfh, &copts, false); - if (rc) { - applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->yfname, rc); - } - return 0; -} + if (!stor_update_cb()) + return 1; -static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) -{ - struct cld_session *sp = carg->private; - struct cldc_call_opts copts; - int rc; - const char *ptr; - int len; - - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD get(%s) failed: %d", sp->yfname, errc); - goto close_and_next; /* spaghetti */ - } - - ptr = carg->u.get.buf; - len = carg->u.get.size; - stor_parse(sp->yfname, ptr, len); - -close_and_next: - memset(&copts, 0, sizeof(copts)); - copts.cb = cldu_close_y_cb; - copts.private = sp; - rc = cldc_close(sp->yfh, &copts); - if (rc) { - applog(LOG_ERR, "cldc_close call error %d", rc); - } return 0; } -static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc) +static void next_chunk(struct cld_session *sp, struct chunk_node *np) { - struct cld_session *sp = carg->private; - struct chunk_node *np; - // struct cldc_call_opts copts; - // int rc; + char *mem; + char *yfname; /* /chunk-GROUP/NID file */ + struct ncld_fh *yfh; /* /chunk-GROUP/NID file */ + struct ncld_read *nrp; + int error; - if (errc != CLE_OK) { - applog(LOG_ERR, "CLD close(%s) failed: %d", sp->yfname, errc); - return 0; + if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) { + applog(LOG_WARNING, "OOM in cldu"); + goto err_mem; } + yfname = mem; - free(sp->yfname); - sp->yfname = NULL; - - np = list_entry(sp->chunks.next, struct chunk_node, link); - list_del(&np->link); - - if (!list_empty(&sp->chunks)) { - next_chunk(sp); - return 0; + yfh = ncld_open(sp->nsp, yfname, COM_READ, &error, + 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */, + NULL, NULL); + if (!yfh) { + applog(LOG_ERR, "CLD open(%s) failed: %d", yfname, error); + goto err_open; } /* - * No more chunks to consider in this cycle, we're all done. - * Now, poke the dispatch about the possible changes in the - * configuration of Chunk. - * - * It's possible that the CLD directories are full of all garbage, - * but no useable Chunk servers. In that case, treat everything - * like a usual retry. - * - * For the case of normal operation, we also set up a rescan, for now. - * In the future, we'll subscribe for change notification. FIXME. + * Read the Chunk's parameter file. */ - if (stor_update_cb()) { - evtimer_add(&sp->tm_rescan, &cldu_rescan_delay); - } else { - if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) { - applog(LOG_ERR, "evtimer_add error %s", - strerror(errno)); - } - } - return 0; + nrp = ncld_get(yfh, &error); + if (!nrp) { + applog(LOG_ERR, "CLD get(%s) failed: %d", yfname, error); + goto err_get; + } + stor_parse(yfname, nrp->ptr, nrp->length); + ncld_read_free(nrp); + ncld_close(yfh); + free(yfname); + return; + +err_get: + ncld_close(yfh); +err_open: + free(yfname); +err_mem: + return; } /* @@ -937,7 +572,7 @@ static struct cld_session ses; */ void cld_init() { - cldc_init(); + ncld_init(); // memset(&ses, 0, sizeof(struct cld_session)); INIT_LIST_HEAD(&ses.chunks); @@ -949,10 +584,10 @@ void cld_init() int cld_begin(const char *thishost, const char *thisgroup) { static struct cld_session *sp = &ses; + struct timespec tm; + int retry_cnt; - evtimer_set(&ses.tm_retry, cldu_tm_retry, &ses); evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses); - evtimer_set(&ses.tm_reopen, cldu_tm_reopen, &ses); if (cldu_setgroup(sp, thisgroup, thishost)) { /* Already logged error */ @@ -999,8 +634,21 @@ int cld_begin(const char *thishost, const char *thisgroup) goto err_net; } + retry_cnt = 0; + for (;;) { + if (!scan_chunks(sp)) + break; + if (++retry_cnt == 5) + goto err_scan; + tm.tv_sec = 5; + tm.tv_nsec = 0; + nanosleep(&tm, NULL); + } + + evtimer_add(&sp->tm_rescan, &cldu_rescan_delay); return 0; +err_scan: err_net: err_addr: err_group: @@ -1034,12 +682,9 @@ void cld_end(void) static struct cld_session *sp = &ses; int i; - if (sp->lib) { - event_del(&sp->ev); - // if (sp->sess_open) /* kill it always, include half-open */ - cldc_kill_sess(sp->lib->sess); - cldc_udp_free(sp->lib); - sp->lib = NULL; + if (sp->nsp) { + ncld_sess_close(sp->nsp); + sp->nsp = NULL; } if (!sp->forced_hosts) { @@ -1051,9 +696,7 @@ void cld_end(void) } } - evtimer_del(&sp->tm_retry); evtimer_del(&sp->tm_rescan); - evtimer_del(&sp->tm_reopen); free(sp->cfname); sp->cfname = NULL; @@ -1061,8 +704,6 @@ void cld_end(void) sp->ffname = NULL; free(sp->xfname); sp->xfname = NULL; - free(sp->yfname); - sp->yfname = NULL; free(sp->thisgroup); sp->thisgroup = NULL; free(sp->thishost); -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html