[PATCH 1/3] CLD: convert back to libevent

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Switch CLD from hand-rolled server poll code, to libevent.  Follows
similar techniques and rationale as chunkd commit
c1aed7464f237e5a6309351bf003162c77d69e27.  This reverts ancient commit
90b3b5edcf5aa00577f4395fdbb490ed7e9be824.

Signed-off-by: Jeff Garzik <jgarzik@xxxxxxxxxx>
---
 cld/Makefile.am |    3 -
 cld/cld.h       |   22 +++----
 cld/server.c    |  161 ++++++++++++++++++++------------------------------------
 cld/session.c   |   69 ++++++++++++++++--------
 4 files changed, 118 insertions(+), 137 deletions(-)

diff --git a/cld/Makefile.am b/cld/Makefile.am
index 9a13ce0..30eea0b 100644
--- a/cld/Makefile.am
+++ b/cld/Makefile.am
@@ -12,7 +12,8 @@ cld_SOURCES	= cldb.h cld.h \
 		  cldb.c msg.c server.c session.c util.c
 cld_LDADD	= \
 		  ../lib/libhail.la @GLIB_LIBS@ @CRYPTO_LIBS@ \
-		  @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@
+		  @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@ \
+		  @EVENT_LIBS@
 
 cldbadm_SOURCES	= cldb.h cldbadm.c
 cldbadm_LDADD	= @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@
diff --git a/cld/cld.h b/cld/cld.h
index 4c0099f..17f14b8 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -22,8 +22,9 @@
 
 #include <netinet/in.h>
 #include <sys/time.h>
-#include <poll.h>
+#include <event.h>
 #include <glib.h>
+#include <elist.h>
 #include "cldb.h"
 #include <cld_msg_rpc.h>
 #include <cld_common.h>
@@ -59,13 +60,13 @@ struct session {
 
 	uint64_t		last_contact;
 	uint64_t		next_fh;
-	struct cld_timer	timer;
+	struct event		timer;
 
 	uint64_t		next_seqid_in;
 	uint64_t		next_seqid_out;
 
 	GList			*out_q;		/* outgoing pkts (to client) */
-	struct cld_timer	retry_timer;
+	struct event		retry_timer;
 
 	char			user[CLD_MAX_USERNAME];
 
@@ -85,10 +86,10 @@ struct server_stats {
 	unsigned long		garbage;	/* num. garbage pkts dropped */
 };
 
-struct server_poll {
+struct server_socket {
 	int			fd;
-	bool			(*cb)(int fd, short events, void *userdata);
-	void			*userdata;
+	struct event		ev;
+	struct list_head	sockets_node;
 };
 
 struct server {
@@ -103,14 +104,13 @@ struct server {
 
 	struct cldb		cldb;		/* database info */
 
-	GArray			*polls;
-	GArray			*poll_data;
+	struct event_base	*evbase_main;
 
-	GHashTable		*sessions;
+	struct list_head	sockets;
 
-	struct cld_timer_list	timers;
+	GHashTable		*sessions;
 
-	struct cld_timer	chkpt_timer;	/* db4 checkpoint timer */
+	struct event		chkpt_timer;	/* db4 checkpoint timer */
 
 	struct server_stats	stats;		/* global statistics */
 };
diff --git a/cld/server.c b/cld/server.c
index 7a57785..aed501b 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -559,7 +559,7 @@ static void simple_sendresp(int sock_fd, const struct client *cli,
 		       info->op);
 }
 
-static bool udp_srv_event(int fd, short events, void *userdata)
+static void udp_srv_event(int fd, short events, void *userdata)
 {
 	struct client cli;
 	char host[64];
@@ -586,7 +586,7 @@ static bool udp_srv_event(int fd, short events, void *userdata)
 	rrc = recvmsg(fd, &hdr, 0);
 	if (rrc < 0) {
 		syslogerr("UDP recvmsg");
-		return true; /* continue main loop; do NOT terminate server */
+		return;
 	}
 	cli.addr_len = hdr.msg_namelen;
 
@@ -601,59 +601,60 @@ static bool udp_srv_event(int fd, short events, void *userdata)
 
 	if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) {
 		cld_srv.stats.garbage++;
-		return true;
+		return;
 	}
 
 	if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) {
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
 		cld_srv.stats.garbage++;
-		return true;
+		return;
 	}
 
 	if (packet_is_dupe(&info)) {
 		/* silently drop dupes */
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	err = validate_pkt_session(&info, &cli);
 	if (err) {
 		simple_sendresp(fd, &cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	err = pkt_chk_sig(raw_pkt, rrc, &pkt);
 	if (err) {
 		simple_sendresp(fd, &cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) {
 		simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef,
 			       (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	err = udp_rx(fd, &cli, &info, raw_pkt, rrc);
 	if (err) {
 		simple_sendresp(fd, &cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 	xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-	return true;
 }
 
 static void add_chkpt_timer(void)
 {
-	cld_timer_add(&cld_srv.timers, &cld_srv.chkpt_timer,
-		      time(NULL) + CLD_CHKPT_SEC);
+	struct timeval tv = { .tv_sec = CLD_CHKPT_SEC };
+
+	if (evtimer_add(&cld_srv.chkpt_timer, &tv) < 0)
+		HAIL_WARN(&srv_log, "chkpt timer add failed");
 }
 
-static void cldb_checkpoint(struct cld_timer *timer)
+static void cldb_checkpoint(int fd, short events, void *userdata)
 {
 	DB_ENV *dbenv = cld_srv.cldb.env;
 	int rc;
@@ -690,28 +691,28 @@ static int net_write_port(const char *port_file, const char *port_str)
 
 static void net_close(void)
 {
-	struct pollfd *pfd;
-	int i;
-
-	if (!cld_srv.polls)
-		return;
-
-	for (i = 0; i < cld_srv.polls->len; i++) {
-		pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-		if (pfd->fd >= 0) {
-			if (close(pfd->fd) < 0)
-				HAIL_WARN(&srv_log, "%s(%d): %s",
-					  __func__, pfd->fd, strerror(errno));
-			pfd->fd = -1;
+	struct server_socket *tmp, *iter;
+
+	list_for_each_entry_safe(tmp, iter, &cld_srv.sockets, sockets_node) {
+		if (tmp->fd >= 0) {
+			if (event_del(&tmp->ev) < 0)
+				HAIL_WARN(&srv_log, "Event delete(%d) failed",
+					  tmp->fd);
+			if (close(tmp->fd) < 0)
+				HAIL_WARN(&srv_log, "Close(%d) failed: %s",
+					  tmp->fd, strerror(errno));
+			tmp->fd = -1;
 		}
+
+		list_del(&tmp->sockets_node);
+		free(tmp);
 	}
 }
 
 static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 			   int addr_len, void *addr_ptr)
 {
-	struct server_poll sp;
-	struct pollfd pfd;
+	struct server_socket *sock;
 	int fd, rc;
 
 	fd = socket(addr_fam, sock_type, sock_prot);
@@ -732,15 +733,25 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 		return -errno;
 	}
 
-	sp.fd = fd;
-	sp.cb = udp_srv_event;
-	sp.userdata = NULL;
-	g_array_append_val(cld_srv.poll_data, sp);
+	sock = calloc(1, sizeof(*sock));
+	if (!sock) {
+		close(fd);
+		return -ENOMEM;
+	}
+
+	sock->fd = fd;
+	INIT_LIST_HEAD(&sock->sockets_node);
+
+	event_set(&sock->ev, fd, EV_READ | EV_PERSIST,
+		  udp_srv_event, sock);
+
+	if (event_add(&sock->ev, NULL) < 0) {
+		free(sock);
+		close(fd);
+		return -EIO;
+	}
 
-	pfd.fd = fd;
-	pfd.events = POLLIN;
-	pfd.revents = 0;
-	g_array_append_val(cld_srv.polls, pfd);
+	list_add_tail(&sock->sockets_node, &cld_srv.sockets);
 
 	return fd;
 }
@@ -891,11 +902,13 @@ static void segv_signal(int signo)
 static void term_signal(int signo)
 {
 	server_running = false;
+	event_loopbreak();
 }
 
 static void stats_signal(int signo)
 {
 	dump_stats = true;
+	event_loopbreak();
 }
 
 #define X(stat) \
@@ -975,73 +988,16 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 
 static int main_loop(void)
 {
-	time_t next_timeout;
-
-	next_timeout = cld_timers_run(&cld_srv.timers);
-
 	while (server_running) {
-		struct pollfd *pfd;
-		int i, fired, rc;
-
 		cld_srv.stats.poll++;
-
-		/* poll for fd activity, or next timer event */
-		rc = poll(&g_array_index(cld_srv.polls, struct pollfd, 0),
-			  cld_srv.polls->len,
-			  next_timeout ? (next_timeout * 1000) : -1);
-		if (rc < 0) {
-			syslogerr("poll");
-			if (errno != EINTR)
-				break;
-		}
+		event_dispatch();
 
 		gettimeofday(&current_time, NULL);
 
-		/* determine which fd's fired; call their callbacks */
-		fired = 0;
-		for (i = 0; i < cld_srv.polls->len; i++) {
-			struct server_poll *sp;
-			bool runrunrun;
-			short revents;
-
-			/* ref pollfd struct */
-			pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-
-			/* if no events fired, move on to next */
-			if (!pfd->revents)
-				continue;
-
-			fired++;
-
-			revents = pfd->revents;
-			pfd->revents = 0;
-
-			/* ref 1:1 matching server_poll struct */
-			sp = &g_array_index(cld_srv.poll_data,
-					    struct server_poll, i);
-
-			cld_srv.stats.event++;
-
-			/* call callback, shutting down server if requested */
-			runrunrun = sp->cb(sp->fd, revents, sp->userdata);
-			if (!runrunrun) {
-				server_running = false;
-				break;
-			}
-
-			/* if we reached poll(2) activity count, it is
-			 * pointless to continue looping
-			 */
-			if (fired == rc)
-				break;
-		}
-
 		if (dump_stats) {
 			dump_stats = false;
 			stats_dump();
 		}
-
-		next_timeout = cld_timers_run(&cld_srv.timers);
 	}
 
 	return 0;
@@ -1052,6 +1008,8 @@ int main (int argc, char *argv[])
 	error_t aprc;
 	int rc = 1;
 
+	INIT_LIST_HEAD(&cld_srv.sockets);
+
 	/* isspace() and strcasecmp() consistency requires this */
 	setlocale(LC_ALL, "C");
 
@@ -1075,6 +1033,8 @@ int main (int argc, char *argv[])
 	if (use_syslog)
 		openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
 
+	cld_srv.evbase_main = event_init();
+
 	if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
 		syslogerr("daemon");
 		goto err_out;
@@ -1103,17 +1063,13 @@ int main (int argc, char *argv[])
 
 	ensure_root();
 
-	cld_timer_init(&cld_srv.chkpt_timer, "db4-checkpoint",
-		       cldb_checkpoint, NULL);
+	evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
 	add_chkpt_timer();
 
 	rc = 1;
 
 	cld_srv.sessions = g_hash_table_new(sess_hash, sess_equal);
-	cld_srv.poll_data = g_array_sized_new(FALSE, FALSE,
-					   sizeof(struct server_poll), 4);
-	cld_srv.polls = g_array_sized_new(FALSE,FALSE,sizeof(struct pollfd), 4);
-	if (!cld_srv.sessions || !cld_srv.poll_data || !cld_srv.polls)
+	if (!cld_srv.sessions)
 		goto err_out_pid;
 
 	if (sess_load(cld_srv.sessions) != 0)
@@ -1137,7 +1093,8 @@ int main (int argc, char *argv[])
 	HAIL_INFO(&srv_log, "shutting down");
 
 	if (strict_free)
-		cld_timer_del(&cld_srv.timers, &cld_srv.chkpt_timer);
+		if (evtimer_del(&cld_srv.chkpt_timer) < 0)
+			HAIL_WARN(&srv_log, "chkpt timer del failed");
 
 	if (cld_srv.cldb.up)
 		cldb_down(&cld_srv.cldb);
@@ -1149,8 +1106,6 @@ err_out_pid:
 err_out:
 	if (strict_free) {
 		net_close();
-		g_array_free(cld_srv.polls, TRUE);
-		g_array_free(cld_srv.poll_data, TRUE);
 		sessions_free();
 		g_hash_table_unref(cld_srv.sessions);
 	}
diff --git a/cld/session.c b/cld/session.c
index d76186b..9887aaa 100644
--- a/cld/session.c
+++ b/cld/session.c
@@ -43,8 +43,8 @@ struct session_outpkt {
 	void			*done_data;
 };
 
-static void session_retry(struct cld_timer *);
-static void session_timeout(struct cld_timer *);
+static void session_retry(int, short, void *);
+static void session_timeout(int, short, void *);
 static int sess_load_db(GHashTable *ss, DB_TXN *txn);
 static void op_unref(struct session_outpkt *op);
 
@@ -87,8 +87,8 @@ static struct session *session_new(void)
 
 	cld_rand64(&sess->next_seqid_out);
 
-	cld_timer_init(&sess->timer, "session-timeout", session_timeout, sess);
-	cld_timer_init(&sess->retry_timer, "session-retry", session_retry, sess);
+	evtimer_set(&sess->timer, session_timeout, sess);
+	evtimer_set(&sess->retry_timer, session_retry, sess);
 
 	return sess;
 }
@@ -103,8 +103,10 @@ static void session_free(struct session *sess, bool hash_remove)
 	if (hash_remove)
 		g_hash_table_remove(cld_srv.sessions, sess->sid);
 
-	cld_timer_del(&cld_srv.timers, &sess->timer);
-	cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+	if (evtimer_del(&sess->timer) < 0)
+		HAIL_ERR(&srv_log, "sess timer del failed");
+	if (evtimer_del(&sess->retry_timer) < 0)
+		HAIL_ERR(&srv_log, "sess retry timer del failed");
 
 	tmp = sess->out_q;
 	while (tmp) {
@@ -376,9 +378,9 @@ static void session_ping_done(struct session_outpkt *outpkt)
 	outpkt->sess->ping_open = false;
 }
 
-static void session_timeout(struct cld_timer *timer)
+static void session_timeout(int fd, short events, void *userdata)
 {
-	struct session *sess = timer->userdata;
+	struct session *sess = userdata;
 	uint64_t sess_expire;
 	int rc;
 	DB_ENV *dbenv = cld_srv.cldb.env;
@@ -387,6 +389,8 @@ static void session_timeout(struct cld_timer *timer)
 
 	sess_expire = sess->last_contact + CLD_SESS_TIMEOUT;
 	if (!sess->dead && (sess_expire > now)) {
+		struct timeval tv;
+
 		if (!sess->ping_open &&
 		    (sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) &&
 		    (sess->sock_fd > 0)))) {
@@ -396,9 +400,12 @@ static void session_timeout(struct cld_timer *timer)
 				     session_ping_done, NULL);
 		}
 
-		cld_timer_add(&cld_srv.timers, &sess->timer,
-			      now + ((sess_expire - now) / 2) + 1);
-		return;	/* timer added; do not time out session */
+		tv.tv_sec = ((sess_expire - now) / 2) + 1;
+		tv.tv_usec = 0;
+		if (evtimer_add(&sess->timer, &tv) < 0)
+			HAIL_ERR(&srv_log, "timer add failed, sid " SIDFMT,
+				 SIDARG(sess->sid));
+		return;		/* timer added; do not time out session */
 	}
 
 	HAIL_INFO(&srv_log, "session %s, addr %s sid " SIDFMT,
@@ -554,25 +561,33 @@ static int sess_retry_output(struct session *sess, time_t *next_retry_out)
 	return rc;
 }
 
-static void session_retry(struct cld_timer *timer)
+static void session_retry(int fd, short events, void *userdata)
 {
-	struct session *sess = timer->userdata;
+	struct session *sess = userdata;
 	time_t next_retry;
+	time_t now = time(NULL);
+	struct timeval tv;
 
 	if (!sess->out_q)
 		return;
 
 	sess_retry_output(sess, &next_retry);
 
-	cld_timer_add(&cld_srv.timers, &sess->retry_timer, next_retry);
+	tv.tv_sec = next_retry - now;
+	tv.tv_usec = 0;
+
+	if (evtimer_add(&sess->retry_timer, &tv) < 0)
+		HAIL_ERR(&srv_log, "retry timer re-add failed");
 }
 
 static void session_outq(struct session *sess, GList *new_pkts)
 {
 	/* if out_q empty, start retry timer */
-	if (!sess->out_q)
-		cld_timer_add(&cld_srv.timers, &sess->retry_timer,
-			      time(NULL) + CLD_RETRY_START);
+	if (!sess->out_q) {
+		struct timeval tv = { .tv_sec = CLD_RETRY_START };
+		if (evtimer_add(&sess->retry_timer, &tv) < 0)
+			HAIL_ERR(&srv_log, "retry timer start failed");
+	}
 
 	sess->out_q = g_list_concat(sess->out_q, new_pkts);
 }
@@ -766,7 +781,8 @@ void msg_ack(struct session *sess, uint64_t seqid)
 	}
 
 	if (!sess->out_q)
-		cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+		if (evtimer_del(&sess->retry_timer) < 0)
+			HAIL_ERR(&srv_log, "sess retry timer del 2 failed");
 }
 
 void msg_new_sess(int sock_fd, const struct client *cli,
@@ -780,6 +796,7 @@ void msg_new_sess(int sock_fd, const struct client *cli,
 	int rc;
 	enum cle_err_codes resp_rc = CLE_OK;
 	struct cld_msg_generic_resp resp;
+	struct timeval tv;
 
 	sess = session_new();
 	if (!sess) {
@@ -832,8 +849,10 @@ void msg_new_sess(int sock_fd, const struct client *cli,
 	g_hash_table_insert(cld_srv.sessions, sess->sid, sess);
 
 	/* begin session timer */
-	cld_timer_add(&cld_srv.timers, &sess->timer,
-		      time(NULL) + (CLD_SESS_TIMEOUT / 2));
+	tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+	tv.tv_usec = 0;
+	if (evtimer_add(&sess->timer, &tv) < 0)
+		HAIL_ERR(&srv_log, "sess timer start failed");
 
 	/* send new-sess reply */
 	resp.code = CLE_OK;
@@ -933,6 +952,8 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
 	val.flags = DB_DBT_USERMEM;
 
 	while (1) {
+		struct timeval tv;
+
 		rc = cur->get(cur, &key, &val, DB_NEXT);
 		if (rc == DB_NOTFOUND)
 			break;
@@ -960,8 +981,12 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
 		g_hash_table_insert(ss, sess->sid, sess);
 
 		/* begin session timer */
-		cld_timer_add(&cld_srv.timers, &sess->timer,
-			      time(NULL) + (CLD_SESS_TIMEOUT / 2));
+		tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+		tv.tv_usec = 0;
+		if (evtimer_add(&sess->timer, &tv) < 0) {
+			HAIL_ERR(&srv_log, "sess timer loop start failed");
+			break;
+		}
 	}
 
 	cur->close(cur);
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux