[tabled patch v2] abstract out TCP-write code

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Changes from v1:
- avoid referencing dead struct client (grep for 'invalidate_cli'),
  by changing FSM callback prototype.
- insert 'void *priv' member into struct atcp_wr_state, and replace
  cb_data1/cb_data2 callback parameters with (struct atcp_wr_state *, void *).
  struct client / struct session, or whatever, may be stored in
  atcp_wr_state::priv.
- minor API polishing and further abstraction

 server/Makefile.am |    1 
 server/atcp.c      |  238 +++++++++++++++++++++++++++++++++++++++++++++++
 server/atcp.h      |  100 +++++++++++++++++++
 server/bucket.c    |    8 -
 server/object.c    |   56 +++++------
 server/server.c    |  268 +++++++++--------------------------------------------
 server/status.c    |    3 
 server/tabled.h    |   46 ++-------
 8 files changed, 436 insertions(+), 284 deletions(-)

diff --git a/server/Makefile.am b/server/Makefile.am
index 5b53a0a..5e0abd5 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,6 +4,7 @@ INCLUDES	= -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
 sbin_PROGRAMS	= tabled tdbadm
 
 tabled_SOURCES	= tabled.h		\
+		  atcp.c atcp.h		\
 		  bucket.c cldu.c config.c metarep.c object.c replica.c \
 		  server.c status.c storage.c storparse.c util.c
 tabled_LDADD	= ../lib/libtdb.a		\
diff --git a/server/atcp.c b/server/atcp.c
new file mode 100644
index 0000000..0050a68
--- /dev/null
+++ b/server/atcp.c
@@ -0,0 +1,238 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/uio.h>
+#include "atcp.h"
+
+bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done)
+{
+	free(cb_data);
+	return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+	struct atcp_wr_state *wst = tmp->wst;
+
+	list_del(&tmp->node);
+	list_add_tail(&tmp->node, &wst->write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+	struct atcp_wr_state *wst = tmp->wst;
+	bool rcb = false;
+
+	wst->write_cnt -= tmp->length;
+	list_del_init(&tmp->node);
+	if (tmp->cb)
+		rcb = tmp->cb(wst, tmp->cb_data, done);
+	free(tmp);
+
+	return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+	struct atcp_write *wr;
+	bool do_loop;
+
+	do_loop = false;
+	while (!list_empty(&wst->write_compl_q)) {
+		wr = list_entry(wst->write_compl_q.next,
+				struct atcp_write, node);
+		do_loop |= atcp_write_free(wr, true);
+	}
+	return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+	struct atcp_write *wr, *tmp;
+
+	atcp_write_run_compl(wst);
+	list_for_each_entry_safe(wr, tmp, &wst->write_q, node) {
+		atcp_write_free(wr, false);
+	}
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+	int n_iov;
+	struct atcp_write *tmp;
+	ssize_t rc;
+	struct iovec iov[ATCP_MAX_WR_IOV];
+
+	/* accumulate pending writes into iovec */
+	n_iov = 0;
+	list_for_each_entry(tmp, &wst->write_q, node) {
+		if (n_iov == ATCP_MAX_WR_IOV)
+			break;
+		/* bleh, struct iovec should declare iov_base const */
+		iov[n_iov].iov_base = (void *) tmp->buf;
+		iov[n_iov].iov_len = tmp->togo;
+		n_iov++;
+	}
+
+	/* execute non-blocking write */
+do_write:
+	rc = writev(wst->fd, iov, n_iov);
+	if (rc < 0) {
+		if (errno == EINTR)
+			goto do_write;
+		if (errno != EAGAIN)
+			goto err_out;
+		return true;
+	}
+
+	/* iterate through write queue, issuing completions based on
+	 * amount of data written
+	 */
+	while (rc > 0) {
+		int sz;
+
+		/* get pointer to first record on list */
+		tmp = list_entry(wst->write_q.next, struct atcp_write, node);
+
+		/* mark data consumed by decreasing tmp->len */
+		sz = (tmp->togo < rc) ? tmp->togo : rc;
+		tmp->togo -= sz;
+		tmp->buf += sz;
+		rc -= sz;
+
+		/* if tmp->len reaches zero, write is complete,
+		 * so schedule it for clean up (cannot call callback
+		 * right away or an endless recursion will result)
+		 */
+		if (tmp->togo == 0)
+			atcp_write_complete(tmp);
+	}
+
+	/* if we emptied the queue, clear write notification */
+	if (list_empty(&wst->write_q)) {
+		wst->writing = false;
+		if (event_del(&wst->write_ev) < 0)
+			goto err_out;
+	}
+
+	return true;
+
+err_out:
+	atcp_write_free_all(wst);
+	return false;
+}
+
+static void atcp_wr_event(int fd, short events, void *userdata)
+{
+	struct atcp_wr_state *wst = userdata;
+
+	atcp_writable(wst);
+	atcp_write_run_compl(wst);
+}
+
+void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd)
+{
+	wst->fd = fd;
+
+	event_set(&wst->write_ev, fd, EV_WRITE | EV_PERSIST,
+		  atcp_wr_event, wst);
+}
+
+bool atcp_write_start(struct atcp_wr_state *wst)
+{
+	if (list_empty(&wst->write_q))
+		return true;		/* loop, not poll */
+
+	/* if write-poll already active, nothing further to do */
+	if (wst->writing)
+		return false;		/* poll wait */
+
+	/* attempt optimistic write, in hopes of avoiding poll,
+	 * or at least refill the write buffers so as to not
+	 * get -immediately- called again by the kernel
+	 */
+	atcp_writable(wst);
+	if (list_empty(&wst->write_q)) {
+		wst->opt_write++;
+		return true;		/* loop, not poll */
+	}
+
+	if (event_add(&wst->write_ev, NULL) < 0)
+		return true;		/* loop, not poll */
+
+	wst->writing = true;
+
+	return false;			/* poll wait */
+}
+
+int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+	        atcp_write_func cb, void *cb_data)
+{
+	struct atcp_write *wr;
+
+	if (!buf || !buflen)
+		return -EINVAL;
+
+	wr = calloc(1, sizeof(struct atcp_write));
+	if (!wr)
+		return -ENOMEM;
+
+	wr->buf = buf;
+	wr->togo = buflen;
+	wr->length = buflen;
+	wr->cb = cb;
+	wr->cb_data = cb_data;
+	wr->wst = wst;
+	list_add_tail(&wr->node, &wst->write_q);
+	wst->write_cnt += buflen;
+	if (wst->write_cnt > wst->write_cnt_max)
+		wst->write_cnt_max = wst->write_cnt;
+
+	return 0;
+}
+
+void atcp_wr_exit(struct atcp_wr_state *wst)
+{
+	if (!wst)
+		return;
+
+	if (wst->writing)
+		event_del(&wst->write_ev);
+	
+	atcp_write_free_all(wst);
+}
+
+void atcp_wr_init(struct atcp_wr_state *wst, void *priv)
+{
+	memset(wst, 0, sizeof(*wst));
+
+	INIT_LIST_HEAD(&wst->write_q);
+	INIT_LIST_HEAD(&wst->write_compl_q);
+
+	wst->fd = -1;
+
+	wst->priv = priv;
+}
+
diff --git a/server/atcp.h b/server/atcp.h
new file mode 100644
index 0000000..e611aab
--- /dev/null
+++ b/server/atcp.h
@@ -0,0 +1,100 @@
+#ifndef __ATCP_H__
+#define __ATCP_H__
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <event.h>
+#include <elist.h>
+
+enum {
+	ATCP_MAX_WR_IOV		= 32,		/* max iov per writev(2) */
+};
+
+struct atcp_wr_state {
+	int			fd;		/* our socket */
+
+	bool			writing;	/* actively trying to write? */
+
+	size_t			write_cnt;	/* water level */
+	size_t			write_cnt_max;
+
+	struct list_head	write_q;	/* list of async writes */
+	struct list_head	write_compl_q;	/* list of done writes */
+
+	struct event		write_ev;
+
+	void			*priv;		/* untouched by atcp */
+
+	/* various statistics */
+	uint64_t		opt_write;	/* optimistic writes */
+};
+
+typedef bool (*atcp_write_func)(struct atcp_wr_state *, void *, bool);
+
+struct atcp_write {
+	const void		*buf;		/* write buffer pointer */
+	int			togo;		/* write buffer remainder */
+
+	int			length;		/* length for accounting */
+	atcp_write_func		cb;		/* callback */
+	void			*cb_data;	/* data passed to cb */
+
+	struct atcp_wr_state	*wst;		/* our parent */
+
+	struct list_head	node;		/* write_[compl_]q list node */
+};
+
+/* setup and teardown atcp write state */
+extern void atcp_wr_exit(struct atcp_wr_state *wst);
+extern void atcp_wr_init(struct atcp_wr_state *wst, void *priv);
+
+/* generic write callback, that call free(cb_data2) */
+extern bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done);
+
+/* clear all write queues immediately, even if not complete */
+extern void atcp_write_free_all(struct atcp_wr_state *wst);
+
+/* complete all writes found on completion queue */
+extern bool atcp_write_run_compl(struct atcp_wr_state *wst);
+
+/* initialize internal fd, event setup */
+extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd);
+
+/* add a buffer to the write queue */
+extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+	        atcp_write_func cb, void *cb_data);
+
+/* begin pushing write queue to socket */
+extern bool atcp_write_start(struct atcp_wr_state *wst);
+
+/* is anything on the write queue at the moment? */
+static inline bool atcp_wq_empty(struct atcp_wr_state *wst)
+{
+	return list_empty(&wst->write_q) ? true : false;
+}
+
+/* total number of octets queued at this moment */
+static inline size_t atcp_wqueued(struct atcp_wr_state *wst)
+{
+	return wst->write_cnt;
+}
+
+#endif /* __ATCP_H__ */
diff --git a/server/bucket.c b/server/bucket.c
index eb03e03..a81eca1 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
 		     bucket) < 0)
 		return cli_err(cli, InternalError);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out:
 	rc = txn->abort(txn);
@@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
 		     hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
 		return cli_err(cli, InternalError);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out:
 	rc = txn->abort(txn);
diff --git a/server/object.c b/server/object.c
index 3801e94..64cc8b6 100644
--- a/server/object.c
+++ b/server/object.c
@@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user,
 		     hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
 		return cli_err(cli, InternalError);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out:
 	rc = txn->abort(txn);
@@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli)
 		return cli_err(cli, InternalError);
 	}
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out_rb:
 	rc = txn->abort(txn);
@@ -618,7 +618,8 @@ static int object_put_buf(struct client *cli, struct open_chunk *ochunk,
 	return 0;
 }
 
-bool cli_evt_http_data_in(struct client *cli, unsigned int events)
+bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+			  bool *invalidate_cli)
 {
 	ssize_t avail;
 	struct open_chunk *ochunk;
@@ -812,8 +813,8 @@ static bool object_put_body(struct client *cli, const char *user,
 			cli_out_end(cli);
 			return cli_err(cli, InternalError);
 		}
-		cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
-		cli_write_start(cli);
+		atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cont);
+		atcp_write_start(&cli->wst);
 	}
 
 	avail = MIN(cli_req_avail(cli), content_len);
@@ -940,13 +941,13 @@ static bool object_put_acls(struct client *cli, const char *user,
 		return cli_err(cli, InternalError);
 	}
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out_rb:
 	rc = txn->abort(txn);
@@ -990,10 +991,10 @@ void cli_in_end(struct client *cli)
 	cli->in_len = 0;
 }
 
-static bool object_get_more(struct client *cli, void *cb_data, bool done);
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done);
 
 /*
- * Return true iff cli_writeq was called. This is compatible with the
+ * Return true iff atcp_writeq was called. This is compatible with the
  * convention for cli continuation callbacks, so object_get_more can call us.
  */
 static bool object_get_poke(struct client *cli)
@@ -1026,7 +1027,7 @@ static bool object_get_poke(struct client *cli)
 	if (bytes == 0) {
 		if (!cli->in_len) {
 			cli_in_end(cli);
-			cli_write_start(cli);
+			atcp_write_start(&cli->wst);
 		}
 		free(buf);
 		return false;
@@ -1034,15 +1035,15 @@ static bool object_get_poke(struct client *cli)
 
 	cli->in_len -= bytes;
 	if (!cli->in_len) {
-		if (cli_writeq(cli, buf, bytes, cli_cb_free, buf))
+		if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, buf))
 			goto err_out;
 		cli_in_end(cli);
-		cli_write_start(cli);
+		atcp_write_start(&cli->wst);
 	} else {
-		if (cli_writeq(cli, buf, bytes, object_get_more, buf))
+		if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, buf))
 			goto err_out;
-		if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ)
-			cli_write_start(cli);
+		if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ)
+			atcp_write_start(&cli->wst);
 	}
 	return true;
 
@@ -1053,8 +1054,9 @@ err_out:
 }
 
 /* callback from the client side: a queued write is being disposed */
-static bool object_get_more(struct client *cli, void *cb_data, bool done)
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done)
 {
+	struct client *cli = wst->priv;
 
 	/* free now-written buffer */
 	free(cb_data);
@@ -1071,8 +1073,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done)
 /* callback from the chunkd side: some data is available */
 static void object_get_event(struct open_chunk *ochunk)
 {
-	object_get_poke(ochunk->cli);
-	cli_write_run_compl();
+	struct client *cli = ochunk->cli;
+
+	object_get_poke(cli);
+	atcp_write_run_compl(&cli->wst);
 }
 
 static int object_node_count_up(struct db_obj_ent *obj)
@@ -1327,7 +1331,7 @@ static bool object_get_body(struct client *cli, const char *user,
 	if (!want_body) {
 		cli_in_end(cli);
 
-		rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+		rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 		if (rc) {
 			free(hdr);
 			return true;
@@ -1347,7 +1351,7 @@ static bool object_get_body(struct client *cli, const char *user,
 		if (!cli->in_len)
 			cli_in_end(cli);
 
-		rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+		rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 		if (rc) {
 			free(hdr);
 			goto err_out_in_end;
@@ -1365,21 +1369,21 @@ static bool object_get_body(struct client *cli, const char *user,
 		goto err_out_in_end;
 	memcpy(tmp, buf, bytes);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		free(tmp);
 		return true;
 	}
 
-	if (cli_writeq(cli, tmp, bytes,
-		       cli->in_len ? object_get_more : cli_cb_free, tmp))
+	if (atcp_writeq(&cli->wst, tmp, bytes,
+		       cli->in_len ? object_get_more : atcp_cb_free, tmp))
 		goto err_out_in_end;
 
 start_write:
 	free(obj);
 	g_string_free(extra_hdr, TRUE);
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out_in_end:
 	cli_in_end(cli);
diff --git a/server/server.c b/server/server.c
index 3398026..6bdccad 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,8 +56,6 @@
 const char *argp_program_version = PACKAGE_VERSION;
 
 enum {
-	CLI_MAX_WR_IOV		= 32,		/* max iov per writev(2) */
-
 	SFL_FOREGROUND		= (1 << 0),	/* run in foreground */
 };
 
@@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content)
 
 	if (asprintf(&str,
 		     "<p>Stats: "
-		     "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
-		     "<p>Debug: max_write_buf %lu</p>\r\n",
-		     tabled_srv.stats.poll,
-		     tabled_srv.stats.event,
-		     tabled_srv.stats.tcp_accept,
-		     tabled_srv.stats.opt_write,
-		     tabled_srv.stats.max_write_buf) < 0)
+		     "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n"
+		     "<p>Debug: max_write_buf %llu</p>\r\n",
+		     (unsigned long long) tabled_srv.stats.poll,
+		     (unsigned long long) tabled_srv.stats.event,
+		     (unsigned long long) tabled_srv.stats.tcp_accept,
+		     (unsigned long long) tabled_srv.stats.opt_write,
+		     (unsigned long long) tabled_srv.stats.max_write_buf) < 0)
 		return false;
 	content = g_list_append(content, str);
 	return true;
 }
 
-static void cli_write_complete(struct client *cli, struct client_write *tmp)
-{
-	list_del(&tmp->node);
-	list_add_tail(&tmp->node, &tabled_srv.write_compl_q);
-}
-
-static bool cli_write_free(struct client_write *tmp, bool done)
-{
-	struct client *cli = tmp->cb_cli;
-	bool rcb = false;
-
-	cli->write_cnt -= tmp->length;
-	list_del(&tmp->node);
-	if (tmp->cb)
-		rcb = tmp->cb(cli, tmp->cb_data, done);
-	free(tmp);
-
-	return rcb;
-}
-
-static void cli_write_free_all(struct client *cli)
-{
-	struct client_write *wr, *tmp;
-
-	cli_write_run_compl();
-	list_for_each_entry_safe(wr, tmp, &cli->write_q, node) {
-		cli_write_free(wr, false);
-	}
-}
-
-bool cli_write_run_compl(void)
-{
-	struct client_write *wr;
-	bool do_loop;
-
-	do_loop = false;
-	while (!list_empty(&tabled_srv.write_compl_q)) {
-		wr = list_entry(tabled_srv.write_compl_q.next,
-				struct client_write, node);
-		do_loop |= cli_write_free(wr, true);
-	}
-	return do_loop;
-}
-
 static void cli_free(struct client *cli)
 {
-	cli_write_free_all(cli);
+	if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf)
+		tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max;
+	tabled_srv.stats.opt_write += cli->wst.opt_write;
+
+	atcp_wr_exit(&cli->wst);
 
 	cli_out_end(cli);
 	cli_in_end(cli);
@@ -561,27 +519,28 @@ static void cli_free(struct client *cli)
 
 	hreq_free(&cli->req);
 
-	if (cli->write_cnt_max > tabled_srv.stats.max_write_buf)
-		tabled_srv.stats.max_write_buf = cli->write_cnt_max;
-
 	if (debugging)
 		applog(LOG_INFO, "client %s ended", cli->addr_host);
 
 	free(cli);
 }
 
-static bool cli_evt_dispose(struct client *cli, unsigned int events)
+static bool cli_evt_dispose(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	/* if write queue is not empty, we should continue to get
 	 * poll callbacks here until it is
 	 */
-	if (list_empty(&cli->write_q))
+	if (atcp_wq_empty(&cli->wst)) {
 		cli_free(cli);
+		*invalidate_cli = true;
+	}
 
 	return false;
 }
 
-static bool cli_evt_recycle(struct client *cli, unsigned int events)
+static bool cli_evt_recycle(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	unsigned int slop;
 
@@ -608,134 +567,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
 	return true;
 }
 
-static void cli_writable(struct client *cli)
-{
-	int n_iov;
-	struct client_write *tmp;
-	ssize_t rc;
-	struct iovec iov[CLI_MAX_WR_IOV];
-
-	/* accumulate pending writes into iovec */
-	n_iov = 0;
-	list_for_each_entry(tmp, &cli->write_q, node) {
-		if (n_iov == CLI_MAX_WR_IOV)
-			break;
-		/* bleh, struct iovec should declare iov_base const */
-		iov[n_iov].iov_base = (void *) tmp->buf;
-		iov[n_iov].iov_len = tmp->togo;
-		n_iov++;
-	}
-
-	/* execute non-blocking write */
-do_write:
-	rc = writev(cli->fd, iov, n_iov);
-	if (rc < 0) {
-		if (errno == EINTR)
-			goto do_write;
-		if (errno != EAGAIN)
-			goto err_out;
-		return;
-	}
-
-	/* iterate through write queue, issuing completions based on
-	 * amount of data written
-	 */
-	while (rc > 0) {
-		int sz;
-
-		/* get pointer to first record on list */
-		tmp = list_entry(cli->write_q.next, struct client_write, node);
-
-		/* mark data consumed by decreasing tmp->len */
-		sz = (tmp->togo < rc) ? tmp->togo : rc;
-		tmp->togo -= sz;
-		tmp->buf += sz;
-		rc -= sz;
-
-		/* if tmp->len reaches zero, write is complete,
-		 * so schedule it for clean up (cannot call callback
-		 * right away or an endless recursion will result)
-		 */
-		if (tmp->togo == 0)
-			cli_write_complete(cli, tmp);
-	}
-
-	/* if we emptied the queue, clear write notification */
-	if (list_empty(&cli->write_q)) {
-		cli->writing = false;
-		if (event_del(&cli->write_ev) < 0) {
-			applog(LOG_WARNING, "cli_writable event_del");
-			goto err_out;
-		}
-	}
-
-	return;
-
-err_out:
-	cli->state = evt_dispose;
-	cli_write_free_all(cli);
-}
-
-bool cli_write_start(struct client *cli)
-{
-	if (list_empty(&cli->write_q))
-		return true;		/* loop, not poll */
-
-	/* if write-poll already active, nothing further to do */
-	if (cli->writing)
-		return false;		/* poll wait */
-
-	/* attempt optimistic write, in hopes of avoiding poll,
-	 * or at least refill the write buffers so as to not
-	 * get -immediately- called again by the kernel
-	 */
-	cli_writable(cli);
-	if (list_empty(&cli->write_q)) {
-		tabled_srv.stats.opt_write++;
-		return true;		/* loop, not poll */
-	}
-
-	if (event_add(&cli->write_ev, NULL) < 0) {
-		applog(LOG_WARNING, "cli_write event_add");
-		return true;		/* loop, not poll */
-	}
-
-	cli->writing = true;
-
-	return false;			/* poll wait */
-}
-
-int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
-		     cli_write_func cb, void *cb_data)
-{
-	struct client_write *wr;
-
-	if (!buf || !buflen)
-		return -EINVAL;
-
-	wr = calloc(1, sizeof(struct client_write));
-	if (!wr)
-		return -ENOMEM;
-
-	wr->buf = buf;
-	wr->togo = buflen;
-	wr->length = buflen;
-	wr->cb = cb;
-	wr->cb_data = cb_data;
-	wr->cb_cli = cli;
-	list_add_tail(&wr->node, &cli->write_q);
-	cli->write_cnt += buflen;
-	if (cli->write_cnt > cli->write_cnt_max)
-		cli->write_cnt_max = cli->write_cnt;
-
-	return 0;
-}
-
-size_t cli_wqueued(struct client *cli)
-{
-	return cli->write_cnt;
-}
-
 /*
  * Return:
  *   0: progress was NOT made (EOF)
@@ -771,12 +602,6 @@ do_read:
 	return rc != 0;
 }
 
-bool cli_cb_free(struct client *cli, void *cb_data, bool done)
-{
-	free(cb_data);
-	return false;
-}
-
 static int cli_write_list(struct client *cli, GList *list)
 {
 	int rc = 0;
@@ -784,8 +609,8 @@ static int cli_write_list(struct client *cli, GList *list)
 
 	tmp = list;
 	while (tmp) {
-		rc = cli_writeq(cli, tmp->data, strlen(tmp->data),
-			        cli_cb_free, tmp->data);
+		rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data),
+			        atcp_cb_free, tmp->data);
 		if (rc)
 			goto out;
 
@@ -870,14 +695,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content)
 
 	cli->state = evt_dispose;
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc)
 		return true;
-	rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content);
+	rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, content);
 	if (rc)
 		return true;
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 }
 
 static bool cli_resp(struct client *cli, int http_status,
@@ -911,7 +736,7 @@ static bool cli_resp(struct client *cli, int http_status,
 	else
 		cli->state = evt_recycle;
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		cli->state = evt_dispose;
@@ -924,7 +749,7 @@ static bool cli_resp(struct client *cli, int http_status,
 		return true;
 	}
 
-	rcb = cli_write_start(cli);
+	rcb = atcp_write_start(&cli->wst);
 
 	if (cli->state == evt_recycle)
 		return true;
@@ -942,7 +767,8 @@ bool cli_resp_html(struct client *cli, int http_status, GList *content)
 	return cli_resp(cli, http_status, "text/html", content);
 }
 
-static bool cli_evt_http_req(struct client *cli, unsigned int events)
+static bool cli_evt_http_req(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	struct http_req *req = &cli->req;
 	char *host, *auth, *content_len_str;
@@ -1195,7 +1021,8 @@ err_out:
 	return true;
 }
 
-static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_parse_hdr(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	char *buf, *buf_eol;
 	bool eoh = false;
@@ -1252,7 +1079,8 @@ static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
 	return true;
 }
 
-static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_read_hdr(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	int rc = cli_read(cli);
 	if (rc <= 0) {
@@ -1268,7 +1096,8 @@ static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
 	return true;
 }
 
-static bool cli_evt_parse_req(struct client *cli, unsigned int events)
+static bool cli_evt_parse_req(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	char *sp1, *sp2, *buf;
 	enum errcode err_resp;
@@ -1336,7 +1165,8 @@ err_out:
 	return cli_err(cli, err_resp);
 }
 
-static bool cli_evt_read_req(struct client *cli, unsigned int events)
+static bool cli_evt_read_req(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	int rc = cli_read(cli);
 	if (rc <= 0) {
@@ -1385,9 +1215,10 @@ static struct client *cli_alloc(bool is_status)
 		return NULL;
 	}
 
+	atcp_wr_init(&cli->wst, cli);
+
 	cli->state = evt_read_req;
 	cli->evt_table = is_status? evt_funcs_status: evt_funcs_server;
-	INIT_LIST_HEAD(&cli->write_q);
 	INIT_LIST_HEAD(&cli->out_ch);
 	cli->req_ptr = cli->req_buf;
 	memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
@@ -1395,22 +1226,20 @@ static struct client *cli_alloc(bool is_status)
 	return cli;
 }
 
-static void tcp_cli_wr_event(int fd, short events, void *userdata)
-{
-	struct client *cli = userdata;
-
-	cli_writable(cli);
-	cli_write_run_compl();
-}
-
 static void tcp_cli_event(int fd, short events, void *userdata)
 {
 	struct client *cli = userdata;
 	bool loop;
+	bool invalidate_cli = false;
 
 	do {
-		loop = cli->evt_table[cli->state](cli, events);
-		loop |= cli_write_run_compl();
+		loop = cli->evt_table[cli->state](cli, events, &invalidate_cli);
+		if (invalidate_cli) {
+			cli = NULL;
+			break;
+		}
+
+		loop |= atcp_write_run_compl(&cli->wst);
 	} while (loop);
 }
 
@@ -1438,11 +1267,11 @@ static void tcp_srv_event(int fd, short events, void *userdata)
 		goto err_out;
 	}
 
+	atcp_wr_set_fd(&cli->wst, cli->fd);
+
 	tabled_srv.stats.tcp_accept++;
 
 	event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli);
-	event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST,
-		  tcp_cli_wr_event, cli);
 
 	/* mark non-blocking, for upcoming poll use */
 	if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
@@ -2205,7 +2034,6 @@ int main (int argc, char *argv[])
 	struct event_base *event_base_rep;
 
 	INIT_LIST_HEAD(&tabled_srv.all_stor);
-	INIT_LIST_HEAD(&tabled_srv.write_compl_q);
 	tabled_srv.state_tdb = ST_TDB_INIT;
 	tabled_srv.rep_next_id = DBID_MIN;
 
diff --git a/server/status.c b/server/status.c
index e9fbb38..9af60f7 100644
--- a/server/status.c
+++ b/server/status.c
@@ -150,7 +150,8 @@ out_err:
 	return cli_err(cli, InternalError);
 }
 
-bool stat_evt_http_req(struct client *cli, unsigned int events)
+bool stat_evt_http_req(struct client *cli, unsigned int events,
+			bool *invalidate_cli)
 {
 	struct http_req *req = &cli->req;
 	char *method = req->method;
diff --git a/server/tabled.h b/server/tabled.h
index d4d2048..08fd9ad 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -31,6 +31,7 @@
 #include <elist.h>
 #include <tdb.h>
 #include <hail_log.h>
+#include "atcp.h"
 
 #ifndef ARRAY_SIZE
 #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
@@ -102,20 +103,8 @@ struct storage_node {
 	int ref;		/* number of open_chunk or other */
 };
 
-typedef bool (*cli_evt_func)(struct client *, unsigned int);
-typedef bool (*cli_write_func)(struct client *, void *, bool);
-
-struct client_write {
-	const void		*buf;		/* write buffer pointer */
-	int			togo;		/* write buffer remainder */
-
-	int			length;		/* length for accounting */
-	cli_write_func		cb;		/* callback */
-	void			*cb_data;	/* data passed to cb */
-	struct client		*cb_cli;	/* cli passed to cb */
-
-	struct list_head	node;
-};
+typedef bool (*cli_evt_func)(struct client *, unsigned int,
+			     bool *invalidate_cli);
 
 /* an open chunkd client */
 struct open_chunk {
@@ -163,13 +152,9 @@ struct client {
 	int			fd;		/* socket */
 	bool			ev_active;
 	struct event		ev;
-	struct event		write_ev;
 
-	struct list_head	write_q;	/* list of async writes */
-	size_t			write_cnt;	/* water level */
-	bool			writing;
+	struct atcp_wr_state	wst;
 	/* some debugging stats */
-	size_t			write_cnt_max;
 
 	unsigned int		req_used;	/* amount of req_buf in use */
 	char			*req_ptr;	/* start of unexamined data */
@@ -216,12 +201,12 @@ enum st_net {
 };
 
 struct server_stats {
-	unsigned long		poll;		/* number polls */
-	unsigned long		event;		/* events dispatched */
-	unsigned long		tcp_accept;	/* TCP accepted cxns */
-	unsigned long		opt_write;	/* optimistic writes */
+	uint64_t		poll;		/* number polls */
+	uint64_t		event;		/* events dispatched */
+	uint64_t		tcp_accept;	/* TCP accepted cxns */
+	uint64_t		opt_write;	/* optimistic writes */
 
-	unsigned long		max_write_buf;
+	uint64_t		max_write_buf;
 };
 
 #define DBID_NONE      0
@@ -249,7 +234,6 @@ struct server {
 	struct event_base	*evbase_main;
 	int			ev_pipe[2];
 	struct event		pevt;
-	struct list_head	write_compl_q;	/* list of done writes */
 	bool			mc_delay;
 	struct event		mc_timer;
 
@@ -361,7 +345,8 @@ extern bool object_put(struct client *cli, const char *user, const char *bucket,
 		const char *key, long content_len, bool expect_cont);
 extern bool object_get(struct client *cli, const char *user, const char *bucket,
                        const char *key, bool want_body);
-extern bool cli_evt_http_data_in(struct client *cli, unsigned int events);
+extern bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+				bool *invalidate_cli);
 extern void cli_out_end(struct client *cli);
 extern void cli_in_end(struct client *cli);
 
@@ -398,12 +383,6 @@ extern bool cli_err(struct client *cli, enum errcode code);
 extern bool cli_err_write(struct client *cli, char *hdr, char *content);
 extern bool cli_resp_xml(struct client *cli, int http_status, GList *content);
 extern bool cli_resp_html(struct client *cli, int http_status, GList *content);
-extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
-		     cli_write_func cb, void *cb_data);
-extern size_t cli_wqueued(struct client *cli);
-extern bool cli_cb_free(struct client *cli, void *cb_data, bool done);
-extern bool cli_write_start(struct client *cli);
-extern bool cli_write_run_compl(void);
 extern int cli_req_avail(struct client *cli);
 extern void applog(int prio, const char *fmt, ...);
 extern void cld_update_cb(void);
@@ -415,7 +394,8 @@ extern struct db_remote *tdb_find_remote_byname(const char *name);
 extern struct db_remote *tdb_find_remote_byid(int id);
 
 /* status.c */
-extern bool stat_evt_http_req(struct client *cli, unsigned int events);
+extern bool stat_evt_http_req(struct client *cli, unsigned int events,
+				bool *invalidate_cli);
 
 /* config.c */
 extern void read_config(void);
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux