Switch CLD from hand-rolled server poll code, to libevent. Follows similar techniques and rationale as chunkd commit c1aed7464f237e5a6309351bf003162c77d69e27. This reverts ancient commit 90b3b5edcf5aa00577f4395fdbb490ed7e9be824. Signed-off-by: Jeff Garzik <jgarzik@xxxxxxxxxx> --- cld/Makefile.am | 3 - cld/cld.h | 22 +++---- cld/server.c | 161 ++++++++++++++++++++------------------------------------ cld/session.c | 69 ++++++++++++++++-------- 4 files changed, 118 insertions(+), 137 deletions(-) diff --git a/cld/Makefile.am b/cld/Makefile.am index 9a13ce0..30eea0b 100644 --- a/cld/Makefile.am +++ b/cld/Makefile.am @@ -12,7 +12,8 @@ cld_SOURCES = cldb.h cld.h \ cldb.c msg.c server.c session.c util.c cld_LDADD = \ ../lib/libhail.la @GLIB_LIBS@ @CRYPTO_LIBS@ \ - @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@ + @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@ \ + @EVENT_LIBS@ cldbadm_SOURCES = cldb.h cldbadm.c cldbadm_LDADD = @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@ diff --git a/cld/cld.h b/cld/cld.h index 4c0099f..17f14b8 100644 --- a/cld/cld.h +++ b/cld/cld.h @@ -22,8 +22,9 @@ #include <netinet/in.h> #include <sys/time.h> -#include <poll.h> +#include <event.h> #include <glib.h> +#include <elist.h> #include "cldb.h" #include <cld_msg_rpc.h> #include <cld_common.h> @@ -59,13 +60,13 @@ struct session { uint64_t last_contact; uint64_t next_fh; - struct cld_timer timer; + struct event timer; uint64_t next_seqid_in; uint64_t next_seqid_out; GList *out_q; /* outgoing pkts (to client) */ - struct cld_timer retry_timer; + struct event retry_timer; char user[CLD_MAX_USERNAME]; @@ -85,10 +86,10 @@ struct server_stats { unsigned long garbage; /* num. garbage pkts dropped */ }; -struct server_poll { +struct server_socket { int fd; - bool (*cb)(int fd, short events, void *userdata); - void *userdata; + struct event ev; + struct list_head sockets_node; }; struct server { @@ -103,14 +104,13 @@ struct server { struct cldb cldb; /* database info */ - GArray *polls; - GArray *poll_data; + struct event_base *evbase_main; - GHashTable *sessions; + struct list_head sockets; - struct cld_timer_list timers; + GHashTable *sessions; - struct cld_timer chkpt_timer; /* db4 checkpoint timer */ + struct event chkpt_timer; /* db4 checkpoint timer */ struct server_stats stats; /* global statistics */ }; diff --git a/cld/server.c b/cld/server.c index 7a57785..aed501b 100644 --- a/cld/server.c +++ b/cld/server.c @@ -559,7 +559,7 @@ static void simple_sendresp(int sock_fd, const struct client *cli, info->op); } -static bool udp_srv_event(int fd, short events, void *userdata) +static void udp_srv_event(int fd, short events, void *userdata) { struct client cli; char host[64]; @@ -586,7 +586,7 @@ static bool udp_srv_event(int fd, short events, void *userdata) rrc = recvmsg(fd, &hdr, 0); if (rrc < 0) { syslogerr("UDP recvmsg"); - return true; /* continue main loop; do NOT terminate server */ + return; } cli.addr_len = hdr.msg_namelen; @@ -601,59 +601,60 @@ static bool udp_srv_event(int fd, short events, void *userdata) if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) { cld_srv.stats.garbage++; - return true; + return; } if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) { xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); cld_srv.stats.garbage++; - return true; + return; } if (packet_is_dupe(&info)) { /* silently drop dupes */ xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); - return true; + return; } err = validate_pkt_session(&info, &cli); if (err) { simple_sendresp(fd, &cli, &info, err); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); - return true; + return; } err = pkt_chk_sig(raw_pkt, rrc, &pkt); if (err) { simple_sendresp(fd, &cli, &info, err); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); - return true; + return; } if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) { simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef, (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); - return true; + return; } err = udp_rx(fd, &cli, &info, raw_pkt, rrc); if (err) { simple_sendresp(fd, &cli, &info, err); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); - return true; + return; } xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); - return true; } static void add_chkpt_timer(void) { - cld_timer_add(&cld_srv.timers, &cld_srv.chkpt_timer, - time(NULL) + CLD_CHKPT_SEC); + struct timeval tv = { .tv_sec = CLD_CHKPT_SEC }; + + if (evtimer_add(&cld_srv.chkpt_timer, &tv) < 0) + HAIL_WARN(&srv_log, "chkpt timer add failed"); } -static void cldb_checkpoint(struct cld_timer *timer) +static void cldb_checkpoint(int fd, short events, void *userdata) { DB_ENV *dbenv = cld_srv.cldb.env; int rc; @@ -690,28 +691,28 @@ static int net_write_port(const char *port_file, const char *port_str) static void net_close(void) { - struct pollfd *pfd; - int i; - - if (!cld_srv.polls) - return; - - for (i = 0; i < cld_srv.polls->len; i++) { - pfd = &g_array_index(cld_srv.polls, struct pollfd, i); - if (pfd->fd >= 0) { - if (close(pfd->fd) < 0) - HAIL_WARN(&srv_log, "%s(%d): %s", - __func__, pfd->fd, strerror(errno)); - pfd->fd = -1; + struct server_socket *tmp, *iter; + + list_for_each_entry_safe(tmp, iter, &cld_srv.sockets, sockets_node) { + if (tmp->fd >= 0) { + if (event_del(&tmp->ev) < 0) + HAIL_WARN(&srv_log, "Event delete(%d) failed", + tmp->fd); + if (close(tmp->fd) < 0) + HAIL_WARN(&srv_log, "Close(%d) failed: %s", + tmp->fd, strerror(errno)); + tmp->fd = -1; } + + list_del(&tmp->sockets_node); + free(tmp); } } static int net_open_socket(int addr_fam, int sock_type, int sock_prot, int addr_len, void *addr_ptr) { - struct server_poll sp; - struct pollfd pfd; + struct server_socket *sock; int fd, rc; fd = socket(addr_fam, sock_type, sock_prot); @@ -732,15 +733,25 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot, return -errno; } - sp.fd = fd; - sp.cb = udp_srv_event; - sp.userdata = NULL; - g_array_append_val(cld_srv.poll_data, sp); + sock = calloc(1, sizeof(*sock)); + if (!sock) { + close(fd); + return -ENOMEM; + } + + sock->fd = fd; + INIT_LIST_HEAD(&sock->sockets_node); + + event_set(&sock->ev, fd, EV_READ | EV_PERSIST, + udp_srv_event, sock); + + if (event_add(&sock->ev, NULL) < 0) { + free(sock); + close(fd); + return -EIO; + } - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; - g_array_append_val(cld_srv.polls, pfd); + list_add_tail(&sock->sockets_node, &cld_srv.sockets); return fd; } @@ -891,11 +902,13 @@ static void segv_signal(int signo) static void term_signal(int signo) { server_running = false; + event_loopbreak(); } static void stats_signal(int signo) { dump_stats = true; + event_loopbreak(); } #define X(stat) \ @@ -975,73 +988,16 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) static int main_loop(void) { - time_t next_timeout; - - next_timeout = cld_timers_run(&cld_srv.timers); - while (server_running) { - struct pollfd *pfd; - int i, fired, rc; - cld_srv.stats.poll++; - - /* poll for fd activity, or next timer event */ - rc = poll(&g_array_index(cld_srv.polls, struct pollfd, 0), - cld_srv.polls->len, - next_timeout ? (next_timeout * 1000) : -1); - if (rc < 0) { - syslogerr("poll"); - if (errno != EINTR) - break; - } + event_dispatch(); gettimeofday(¤t_time, NULL); - /* determine which fd's fired; call their callbacks */ - fired = 0; - for (i = 0; i < cld_srv.polls->len; i++) { - struct server_poll *sp; - bool runrunrun; - short revents; - - /* ref pollfd struct */ - pfd = &g_array_index(cld_srv.polls, struct pollfd, i); - - /* if no events fired, move on to next */ - if (!pfd->revents) - continue; - - fired++; - - revents = pfd->revents; - pfd->revents = 0; - - /* ref 1:1 matching server_poll struct */ - sp = &g_array_index(cld_srv.poll_data, - struct server_poll, i); - - cld_srv.stats.event++; - - /* call callback, shutting down server if requested */ - runrunrun = sp->cb(sp->fd, revents, sp->userdata); - if (!runrunrun) { - server_running = false; - break; - } - - /* if we reached poll(2) activity count, it is - * pointless to continue looping - */ - if (fired == rc) - break; - } - if (dump_stats) { dump_stats = false; stats_dump(); } - - next_timeout = cld_timers_run(&cld_srv.timers); } return 0; @@ -1052,6 +1008,8 @@ int main (int argc, char *argv[]) error_t aprc; int rc = 1; + INIT_LIST_HEAD(&cld_srv.sockets); + /* isspace() and strcasecmp() consistency requires this */ setlocale(LC_ALL, "C"); @@ -1075,6 +1033,8 @@ int main (int argc, char *argv[]) if (use_syslog) openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3); + cld_srv.evbase_main = event_init(); + if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) { syslogerr("daemon"); goto err_out; @@ -1103,17 +1063,13 @@ int main (int argc, char *argv[]) ensure_root(); - cld_timer_init(&cld_srv.chkpt_timer, "db4-checkpoint", - cldb_checkpoint, NULL); + evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL); add_chkpt_timer(); rc = 1; cld_srv.sessions = g_hash_table_new(sess_hash, sess_equal); - cld_srv.poll_data = g_array_sized_new(FALSE, FALSE, - sizeof(struct server_poll), 4); - cld_srv.polls = g_array_sized_new(FALSE,FALSE,sizeof(struct pollfd), 4); - if (!cld_srv.sessions || !cld_srv.poll_data || !cld_srv.polls) + if (!cld_srv.sessions) goto err_out_pid; if (sess_load(cld_srv.sessions) != 0) @@ -1137,7 +1093,8 @@ int main (int argc, char *argv[]) HAIL_INFO(&srv_log, "shutting down"); if (strict_free) - cld_timer_del(&cld_srv.timers, &cld_srv.chkpt_timer); + if (evtimer_del(&cld_srv.chkpt_timer) < 0) + HAIL_WARN(&srv_log, "chkpt timer del failed"); if (cld_srv.cldb.up) cldb_down(&cld_srv.cldb); @@ -1149,8 +1106,6 @@ err_out_pid: err_out: if (strict_free) { net_close(); - g_array_free(cld_srv.polls, TRUE); - g_array_free(cld_srv.poll_data, TRUE); sessions_free(); g_hash_table_unref(cld_srv.sessions); } diff --git a/cld/session.c b/cld/session.c index d76186b..9887aaa 100644 --- a/cld/session.c +++ b/cld/session.c @@ -43,8 +43,8 @@ struct session_outpkt { void *done_data; }; -static void session_retry(struct cld_timer *); -static void session_timeout(struct cld_timer *); +static void session_retry(int, short, void *); +static void session_timeout(int, short, void *); static int sess_load_db(GHashTable *ss, DB_TXN *txn); static void op_unref(struct session_outpkt *op); @@ -87,8 +87,8 @@ static struct session *session_new(void) cld_rand64(&sess->next_seqid_out); - cld_timer_init(&sess->timer, "session-timeout", session_timeout, sess); - cld_timer_init(&sess->retry_timer, "session-retry", session_retry, sess); + evtimer_set(&sess->timer, session_timeout, sess); + evtimer_set(&sess->retry_timer, session_retry, sess); return sess; } @@ -103,8 +103,10 @@ static void session_free(struct session *sess, bool hash_remove) if (hash_remove) g_hash_table_remove(cld_srv.sessions, sess->sid); - cld_timer_del(&cld_srv.timers, &sess->timer); - cld_timer_del(&cld_srv.timers, &sess->retry_timer); + if (evtimer_del(&sess->timer) < 0) + HAIL_ERR(&srv_log, "sess timer del failed"); + if (evtimer_del(&sess->retry_timer) < 0) + HAIL_ERR(&srv_log, "sess retry timer del failed"); tmp = sess->out_q; while (tmp) { @@ -376,9 +378,9 @@ static void session_ping_done(struct session_outpkt *outpkt) outpkt->sess->ping_open = false; } -static void session_timeout(struct cld_timer *timer) +static void session_timeout(int fd, short events, void *userdata) { - struct session *sess = timer->userdata; + struct session *sess = userdata; uint64_t sess_expire; int rc; DB_ENV *dbenv = cld_srv.cldb.env; @@ -387,6 +389,8 @@ static void session_timeout(struct cld_timer *timer) sess_expire = sess->last_contact + CLD_SESS_TIMEOUT; if (!sess->dead && (sess_expire > now)) { + struct timeval tv; + if (!sess->ping_open && (sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) && (sess->sock_fd > 0)))) { @@ -396,9 +400,12 @@ static void session_timeout(struct cld_timer *timer) session_ping_done, NULL); } - cld_timer_add(&cld_srv.timers, &sess->timer, - now + ((sess_expire - now) / 2) + 1); - return; /* timer added; do not time out session */ + tv.tv_sec = ((sess_expire - now) / 2) + 1; + tv.tv_usec = 0; + if (evtimer_add(&sess->timer, &tv) < 0) + HAIL_ERR(&srv_log, "timer add failed, sid " SIDFMT, + SIDARG(sess->sid)); + return; /* timer added; do not time out session */ } HAIL_INFO(&srv_log, "session %s, addr %s sid " SIDFMT, @@ -554,25 +561,33 @@ static int sess_retry_output(struct session *sess, time_t *next_retry_out) return rc; } -static void session_retry(struct cld_timer *timer) +static void session_retry(int fd, short events, void *userdata) { - struct session *sess = timer->userdata; + struct session *sess = userdata; time_t next_retry; + time_t now = time(NULL); + struct timeval tv; if (!sess->out_q) return; sess_retry_output(sess, &next_retry); - cld_timer_add(&cld_srv.timers, &sess->retry_timer, next_retry); + tv.tv_sec = next_retry - now; + tv.tv_usec = 0; + + if (evtimer_add(&sess->retry_timer, &tv) < 0) + HAIL_ERR(&srv_log, "retry timer re-add failed"); } static void session_outq(struct session *sess, GList *new_pkts) { /* if out_q empty, start retry timer */ - if (!sess->out_q) - cld_timer_add(&cld_srv.timers, &sess->retry_timer, - time(NULL) + CLD_RETRY_START); + if (!sess->out_q) { + struct timeval tv = { .tv_sec = CLD_RETRY_START }; + if (evtimer_add(&sess->retry_timer, &tv) < 0) + HAIL_ERR(&srv_log, "retry timer start failed"); + } sess->out_q = g_list_concat(sess->out_q, new_pkts); } @@ -766,7 +781,8 @@ void msg_ack(struct session *sess, uint64_t seqid) } if (!sess->out_q) - cld_timer_del(&cld_srv.timers, &sess->retry_timer); + if (evtimer_del(&sess->retry_timer) < 0) + HAIL_ERR(&srv_log, "sess retry timer del 2 failed"); } void msg_new_sess(int sock_fd, const struct client *cli, @@ -780,6 +796,7 @@ void msg_new_sess(int sock_fd, const struct client *cli, int rc; enum cle_err_codes resp_rc = CLE_OK; struct cld_msg_generic_resp resp; + struct timeval tv; sess = session_new(); if (!sess) { @@ -832,8 +849,10 @@ void msg_new_sess(int sock_fd, const struct client *cli, g_hash_table_insert(cld_srv.sessions, sess->sid, sess); /* begin session timer */ - cld_timer_add(&cld_srv.timers, &sess->timer, - time(NULL) + (CLD_SESS_TIMEOUT / 2)); + tv.tv_sec = CLD_SESS_TIMEOUT / 2; + tv.tv_usec = 0; + if (evtimer_add(&sess->timer, &tv) < 0) + HAIL_ERR(&srv_log, "sess timer start failed"); /* send new-sess reply */ resp.code = CLE_OK; @@ -933,6 +952,8 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn) val.flags = DB_DBT_USERMEM; while (1) { + struct timeval tv; + rc = cur->get(cur, &key, &val, DB_NEXT); if (rc == DB_NOTFOUND) break; @@ -960,8 +981,12 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn) g_hash_table_insert(ss, sess->sid, sess); /* begin session timer */ - cld_timer_add(&cld_srv.timers, &sess->timer, - time(NULL) + (CLD_SESS_TIMEOUT / 2)); + tv.tv_sec = CLD_SESS_TIMEOUT / 2; + tv.tv_usec = 0; + if (evtimer_add(&sess->timer, &tv) < 0) { + HAIL_ERR(&srv_log, "sess timer loop start failed"); + break; + } } cur->close(cur); -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html