[PATCH] tabled: use libhail's anet

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Updated for move to libhail.  Not committed on main branch until sme
more time passes.

 server/bucket.c |    8 -
 server/object.c |   56 +++++-----
 server/server.c |  294 ++++++++++++++------------------------------------------
 server/status.c |    3 
 server/tabled.h |   45 ++------
 5 files changed, 123 insertions(+), 283 deletions(-)

diff --git a/server/bucket.c b/server/bucket.c
index eb03e03..a81eca1 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
 		     bucket) < 0)
 		return cli_err(cli, InternalError);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out:
 	rc = txn->abort(txn);
@@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
 		     hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
 		return cli_err(cli, InternalError);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out:
 	rc = txn->abort(txn);
diff --git a/server/object.c b/server/object.c
index 3801e94..64cc8b6 100644
--- a/server/object.c
+++ b/server/object.c
@@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user,
 		     hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
 		return cli_err(cli, InternalError);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out:
 	rc = txn->abort(txn);
@@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli)
 		return cli_err(cli, InternalError);
 	}
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out_rb:
 	rc = txn->abort(txn);
@@ -618,7 +618,8 @@ static int object_put_buf(struct client *cli, struct open_chunk *ochunk,
 	return 0;
 }
 
-bool cli_evt_http_data_in(struct client *cli, unsigned int events)
+bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+			  bool *invalidate_cli)
 {
 	ssize_t avail;
 	struct open_chunk *ochunk;
@@ -812,8 +813,8 @@ static bool object_put_body(struct client *cli, const char *user,
 			cli_out_end(cli);
 			return cli_err(cli, InternalError);
 		}
-		cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
-		cli_write_start(cli);
+		atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cont);
+		atcp_write_start(&cli->wst);
 	}
 
 	avail = MIN(cli_req_avail(cli), content_len);
@@ -940,13 +941,13 @@ static bool object_put_acls(struct client *cli, const char *user,
 		return cli_err(cli, InternalError);
 	}
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		return true;
 	}
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out_rb:
 	rc = txn->abort(txn);
@@ -990,10 +991,10 @@ void cli_in_end(struct client *cli)
 	cli->in_len = 0;
 }
 
-static bool object_get_more(struct client *cli, void *cb_data, bool done);
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done);
 
 /*
- * Return true iff cli_writeq was called. This is compatible with the
+ * Return true iff atcp_writeq was called. This is compatible with the
  * convention for cli continuation callbacks, so object_get_more can call us.
  */
 static bool object_get_poke(struct client *cli)
@@ -1026,7 +1027,7 @@ static bool object_get_poke(struct client *cli)
 	if (bytes == 0) {
 		if (!cli->in_len) {
 			cli_in_end(cli);
-			cli_write_start(cli);
+			atcp_write_start(&cli->wst);
 		}
 		free(buf);
 		return false;
@@ -1034,15 +1035,15 @@ static bool object_get_poke(struct client *cli)
 
 	cli->in_len -= bytes;
 	if (!cli->in_len) {
-		if (cli_writeq(cli, buf, bytes, cli_cb_free, buf))
+		if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, buf))
 			goto err_out;
 		cli_in_end(cli);
-		cli_write_start(cli);
+		atcp_write_start(&cli->wst);
 	} else {
-		if (cli_writeq(cli, buf, bytes, object_get_more, buf))
+		if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, buf))
 			goto err_out;
-		if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ)
-			cli_write_start(cli);
+		if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ)
+			atcp_write_start(&cli->wst);
 	}
 	return true;
 
@@ -1053,8 +1054,9 @@ err_out:
 }
 
 /* callback from the client side: a queued write is being disposed */
-static bool object_get_more(struct client *cli, void *cb_data, bool done)
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done)
 {
+	struct client *cli = wst->priv;
 
 	/* free now-written buffer */
 	free(cb_data);
@@ -1071,8 +1073,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done)
 /* callback from the chunkd side: some data is available */
 static void object_get_event(struct open_chunk *ochunk)
 {
-	object_get_poke(ochunk->cli);
-	cli_write_run_compl();
+	struct client *cli = ochunk->cli;
+
+	object_get_poke(cli);
+	atcp_write_run_compl(&cli->wst);
 }
 
 static int object_node_count_up(struct db_obj_ent *obj)
@@ -1327,7 +1331,7 @@ static bool object_get_body(struct client *cli, const char *user,
 	if (!want_body) {
 		cli_in_end(cli);
 
-		rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+		rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 		if (rc) {
 			free(hdr);
 			return true;
@@ -1347,7 +1351,7 @@ static bool object_get_body(struct client *cli, const char *user,
 		if (!cli->in_len)
 			cli_in_end(cli);
 
-		rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+		rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 		if (rc) {
 			free(hdr);
 			goto err_out_in_end;
@@ -1365,21 +1369,21 @@ static bool object_get_body(struct client *cli, const char *user,
 		goto err_out_in_end;
 	memcpy(tmp, buf, bytes);
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		free(tmp);
 		return true;
 	}
 
-	if (cli_writeq(cli, tmp, bytes,
-		       cli->in_len ? object_get_more : cli_cb_free, tmp))
+	if (atcp_writeq(&cli->wst, tmp, bytes,
+		       cli->in_len ? object_get_more : atcp_cb_free, tmp))
 		goto err_out_in_end;
 
 start_write:
 	free(obj);
 	g_string_free(extra_hdr, TRUE);
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 
 err_out_in_end:
 	cli_in_end(cli);
diff --git a/server/server.c b/server/server.c
index 3398026..a4c9928 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,8 +56,6 @@
 const char *argp_program_version = PACKAGE_VERSION;
 
 enum {
-	CLI_MAX_WR_IOV		= 32,		/* max iov per writev(2) */
-
 	SFL_FOREGROUND		= (1 << 0),	/* run in foreground */
 };
 
@@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content)
 
 	if (asprintf(&str,
 		     "<p>Stats: "
-		     "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
-		     "<p>Debug: max_write_buf %lu</p>\r\n",
-		     tabled_srv.stats.poll,
-		     tabled_srv.stats.event,
-		     tabled_srv.stats.tcp_accept,
-		     tabled_srv.stats.opt_write,
-		     tabled_srv.stats.max_write_buf) < 0)
+		     "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n"
+		     "<p>Debug: max_write_buf %llu</p>\r\n",
+		     (unsigned long long) tabled_srv.stats.poll,
+		     (unsigned long long) tabled_srv.stats.event,
+		     (unsigned long long) tabled_srv.stats.tcp_accept,
+		     (unsigned long long) tabled_srv.stats.opt_write,
+		     (unsigned long long) tabled_srv.stats.max_write_buf) < 0)
 		return false;
 	content = g_list_append(content, str);
 	return true;
 }
 
-static void cli_write_complete(struct client *cli, struct client_write *tmp)
-{
-	list_del(&tmp->node);
-	list_add_tail(&tmp->node, &tabled_srv.write_compl_q);
-}
-
-static bool cli_write_free(struct client_write *tmp, bool done)
-{
-	struct client *cli = tmp->cb_cli;
-	bool rcb = false;
-
-	cli->write_cnt -= tmp->length;
-	list_del(&tmp->node);
-	if (tmp->cb)
-		rcb = tmp->cb(cli, tmp->cb_data, done);
-	free(tmp);
-
-	return rcb;
-}
-
-static void cli_write_free_all(struct client *cli)
-{
-	struct client_write *wr, *tmp;
-
-	cli_write_run_compl();
-	list_for_each_entry_safe(wr, tmp, &cli->write_q, node) {
-		cli_write_free(wr, false);
-	}
-}
-
-bool cli_write_run_compl(void)
-{
-	struct client_write *wr;
-	bool do_loop;
-
-	do_loop = false;
-	while (!list_empty(&tabled_srv.write_compl_q)) {
-		wr = list_entry(tabled_srv.write_compl_q.next,
-				struct client_write, node);
-		do_loop |= cli_write_free(wr, true);
-	}
-	return do_loop;
-}
-
 static void cli_free(struct client *cli)
 {
-	cli_write_free_all(cli);
+	if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf)
+		tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max;
+	tabled_srv.stats.opt_write += cli->wst.opt_write;
+
+	atcp_wr_exit(&cli->wst);
 
 	cli_out_end(cli);
 	cli_in_end(cli);
@@ -561,27 +519,28 @@ static void cli_free(struct client *cli)
 
 	hreq_free(&cli->req);
 
-	if (cli->write_cnt_max > tabled_srv.stats.max_write_buf)
-		tabled_srv.stats.max_write_buf = cli->write_cnt_max;
-
 	if (debugging)
 		applog(LOG_INFO, "client %s ended", cli->addr_host);
 
 	free(cli);
 }
 
-static bool cli_evt_dispose(struct client *cli, unsigned int events)
+static bool cli_evt_dispose(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	/* if write queue is not empty, we should continue to get
 	 * poll callbacks here until it is
 	 */
-	if (list_empty(&cli->write_q))
+	if (atcp_wq_empty(&cli->wst)) {
 		cli_free(cli);
+		*invalidate_cli = true;
+	}
 
 	return false;
 }
 
-static bool cli_evt_recycle(struct client *cli, unsigned int events)
+static bool cli_evt_recycle(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	unsigned int slop;
 
@@ -608,134 +567,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
 	return true;
 }
 
-static void cli_writable(struct client *cli)
-{
-	int n_iov;
-	struct client_write *tmp;
-	ssize_t rc;
-	struct iovec iov[CLI_MAX_WR_IOV];
-
-	/* accumulate pending writes into iovec */
-	n_iov = 0;
-	list_for_each_entry(tmp, &cli->write_q, node) {
-		if (n_iov == CLI_MAX_WR_IOV)
-			break;
-		/* bleh, struct iovec should declare iov_base const */
-		iov[n_iov].iov_base = (void *) tmp->buf;
-		iov[n_iov].iov_len = tmp->togo;
-		n_iov++;
-	}
-
-	/* execute non-blocking write */
-do_write:
-	rc = writev(cli->fd, iov, n_iov);
-	if (rc < 0) {
-		if (errno == EINTR)
-			goto do_write;
-		if (errno != EAGAIN)
-			goto err_out;
-		return;
-	}
-
-	/* iterate through write queue, issuing completions based on
-	 * amount of data written
-	 */
-	while (rc > 0) {
-		int sz;
-
-		/* get pointer to first record on list */
-		tmp = list_entry(cli->write_q.next, struct client_write, node);
-
-		/* mark data consumed by decreasing tmp->len */
-		sz = (tmp->togo < rc) ? tmp->togo : rc;
-		tmp->togo -= sz;
-		tmp->buf += sz;
-		rc -= sz;
-
-		/* if tmp->len reaches zero, write is complete,
-		 * so schedule it for clean up (cannot call callback
-		 * right away or an endless recursion will result)
-		 */
-		if (tmp->togo == 0)
-			cli_write_complete(cli, tmp);
-	}
-
-	/* if we emptied the queue, clear write notification */
-	if (list_empty(&cli->write_q)) {
-		cli->writing = false;
-		if (event_del(&cli->write_ev) < 0) {
-			applog(LOG_WARNING, "cli_writable event_del");
-			goto err_out;
-		}
-	}
-
-	return;
-
-err_out:
-	cli->state = evt_dispose;
-	cli_write_free_all(cli);
-}
-
-bool cli_write_start(struct client *cli)
-{
-	if (list_empty(&cli->write_q))
-		return true;		/* loop, not poll */
-
-	/* if write-poll already active, nothing further to do */
-	if (cli->writing)
-		return false;		/* poll wait */
-
-	/* attempt optimistic write, in hopes of avoiding poll,
-	 * or at least refill the write buffers so as to not
-	 * get -immediately- called again by the kernel
-	 */
-	cli_writable(cli);
-	if (list_empty(&cli->write_q)) {
-		tabled_srv.stats.opt_write++;
-		return true;		/* loop, not poll */
-	}
-
-	if (event_add(&cli->write_ev, NULL) < 0) {
-		applog(LOG_WARNING, "cli_write event_add");
-		return true;		/* loop, not poll */
-	}
-
-	cli->writing = true;
-
-	return false;			/* poll wait */
-}
-
-int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
-		     cli_write_func cb, void *cb_data)
-{
-	struct client_write *wr;
-
-	if (!buf || !buflen)
-		return -EINVAL;
-
-	wr = calloc(1, sizeof(struct client_write));
-	if (!wr)
-		return -ENOMEM;
-
-	wr->buf = buf;
-	wr->togo = buflen;
-	wr->length = buflen;
-	wr->cb = cb;
-	wr->cb_data = cb_data;
-	wr->cb_cli = cli;
-	list_add_tail(&wr->node, &cli->write_q);
-	cli->write_cnt += buflen;
-	if (cli->write_cnt > cli->write_cnt_max)
-		cli->write_cnt_max = cli->write_cnt;
-
-	return 0;
-}
-
-size_t cli_wqueued(struct client *cli)
-{
-	return cli->write_cnt;
-}
-
 /*
  * Return:
  *   0: progress was NOT made (EOF)
@@ -771,12 +602,6 @@ do_read:
 	return rc != 0;
 }
 
-bool cli_cb_free(struct client *cli, void *cb_data, bool done)
-{
-	free(cb_data);
-	return false;
-}
-
 static int cli_write_list(struct client *cli, GList *list)
 {
 	int rc = 0;
@@ -784,8 +609,8 @@ static int cli_write_list(struct client *cli, GList *list)
 
 	tmp = list;
 	while (tmp) {
-		rc = cli_writeq(cli, tmp->data, strlen(tmp->data),
-			        cli_cb_free, tmp->data);
+		rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data),
+			        atcp_cb_free, tmp->data);
 		if (rc)
 			goto out;
 
@@ -870,14 +695,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content)
 
 	cli->state = evt_dispose;
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc)
 		return true;
-	rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content);
+	rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, content);
 	if (rc)
 		return true;
 
-	return cli_write_start(cli);
+	return atcp_write_start(&cli->wst);
 }
 
 static bool cli_resp(struct client *cli, int http_status,
@@ -911,7 +736,7 @@ static bool cli_resp(struct client *cli, int http_status,
 	else
 		cli->state = evt_recycle;
 
-	rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+	rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
 	if (rc) {
 		free(hdr);
 		cli->state = evt_dispose;
@@ -924,7 +749,7 @@ static bool cli_resp(struct client *cli, int http_status,
 		return true;
 	}
 
-	rcb = cli_write_start(cli);
+	rcb = atcp_write_start(&cli->wst);
 
 	if (cli->state == evt_recycle)
 		return true;
@@ -942,7 +767,8 @@ bool cli_resp_html(struct client *cli, int http_status, GList *content)
 	return cli_resp(cli, http_status, "text/html", content);
 }
 
-static bool cli_evt_http_req(struct client *cli, unsigned int events)
+static bool cli_evt_http_req(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	struct http_req *req = &cli->req;
 	char *host, *auth, *content_len_str;
@@ -1195,7 +1021,8 @@ err_out:
 	return true;
 }
 
-static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_parse_hdr(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	char *buf, *buf_eol;
 	bool eoh = false;
@@ -1252,7 +1079,8 @@ static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
 	return true;
 }
 
-static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_read_hdr(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	int rc = cli_read(cli);
 	if (rc <= 0) {
@@ -1268,7 +1096,8 @@ static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
 	return true;
 }
 
-static bool cli_evt_parse_req(struct client *cli, unsigned int events)
+static bool cli_evt_parse_req(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	char *sp1, *sp2, *buf;
 	enum errcode err_resp;
@@ -1336,7 +1165,8 @@ err_out:
 	return cli_err(cli, err_resp);
 }
 
-static bool cli_evt_read_req(struct client *cli, unsigned int events)
+static bool cli_evt_read_req(struct client *cli, unsigned int events,
+			    bool *invalidate_cli)
 {
 	int rc = cli_read(cli);
 	if (rc <= 0) {
@@ -1374,6 +1204,32 @@ static cli_evt_func evt_funcs_status[] = {
 	[evt_recycle]		= cli_evt_recycle,
 };
 
+static int cli_le_wset(void *ev_info, int fd, atcp_ev_func cb, void *cb_data)
+{
+	struct event *ev = ev_info;
+
+	event_set(ev, fd, EV_WRITE | EV_PERSIST, cb, cb_data);
+	return 0;
+}
+
+static int cli_le_add(void *ev_info, const struct timeval *tv)
+{
+	struct event *ev = ev_info;
+	return event_add(ev, tv);
+}
+
+static int cli_le_del(void *ev_info)
+{
+	struct event *ev = ev_info;
+	return event_del(ev);
+}
+
+static const struct atcp_wr_ops libevent_wr_ops = {
+	.ev_wset	= cli_le_wset,
+	.ev_add		= cli_le_add,
+	.ev_del		= cli_le_del,
+};
+
 static struct client *cli_alloc(bool is_status)
 {
 	struct client *cli;
@@ -1385,9 +1241,10 @@ static struct client *cli_alloc(bool is_status)
 		return NULL;
 	}
 
+	atcp_wr_init(&cli->wst, &libevent_wr_ops, &cli->write_ev, cli);
+
 	cli->state = evt_read_req;
 	cli->evt_table = is_status? evt_funcs_status: evt_funcs_server;
-	INIT_LIST_HEAD(&cli->write_q);
 	INIT_LIST_HEAD(&cli->out_ch);
 	cli->req_ptr = cli->req_buf;
 	memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
@@ -1395,22 +1252,20 @@ static struct client *cli_alloc(bool is_status)
 	return cli;
 }
 
-static void tcp_cli_wr_event(int fd, short events, void *userdata)
-{
-	struct client *cli = userdata;
-
-	cli_writable(cli);
-	cli_write_run_compl();
-}
-
 static void tcp_cli_event(int fd, short events, void *userdata)
 {
 	struct client *cli = userdata;
 	bool loop;
+	bool invalidate_cli = false;
 
 	do {
-		loop = cli->evt_table[cli->state](cli, events);
-		loop |= cli_write_run_compl();
+		loop = cli->evt_table[cli->state](cli, events, &invalidate_cli);
+		if (invalidate_cli) {
+			cli = NULL;
+			break;
+		}
+
+		loop |= atcp_write_run_compl(&cli->wst);
 	} while (loop);
 }
 
@@ -1438,11 +1293,11 @@ static void tcp_srv_event(int fd, short events, void *userdata)
 		goto err_out;
 	}
 
+	atcp_wr_set_fd(&cli->wst, cli->fd);
+
 	tabled_srv.stats.tcp_accept++;
 
 	event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli);
-	event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST,
-		  tcp_cli_wr_event, cli);
 
 	/* mark non-blocking, for upcoming poll use */
 	if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
@@ -2205,7 +2060,6 @@ int main (int argc, char *argv[])
 	struct event_base *event_base_rep;
 
 	INIT_LIST_HEAD(&tabled_srv.all_stor);
-	INIT_LIST_HEAD(&tabled_srv.write_compl_q);
 	tabled_srv.state_tdb = ST_TDB_INIT;
 	tabled_srv.rep_next_id = DBID_MIN;
 
diff --git a/server/status.c b/server/status.c
index e9fbb38..9af60f7 100644
--- a/server/status.c
+++ b/server/status.c
@@ -150,7 +150,8 @@ out_err:
 	return cli_err(cli, InternalError);
 }
 
-bool stat_evt_http_req(struct client *cli, unsigned int events)
+bool stat_evt_http_req(struct client *cli, unsigned int events,
+			bool *invalidate_cli)
 {
 	struct http_req *req = &cli->req;
 	char *method = req->method;
diff --git a/server/tabled.h b/server/tabled.h
index d4d2048..4d3a2d9 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -31,6 +31,7 @@
 #include <elist.h>
 #include <tdb.h>
 #include <hail_log.h>
+#include <anet.h>
 
 #ifndef ARRAY_SIZE
 #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
@@ -102,20 +103,8 @@ struct storage_node {
 	int ref;		/* number of open_chunk or other */
 };
 
-typedef bool (*cli_evt_func)(struct client *, unsigned int);
-typedef bool (*cli_write_func)(struct client *, void *, bool);
-
-struct client_write {
-	const void		*buf;		/* write buffer pointer */
-	int			togo;		/* write buffer remainder */
-
-	int			length;		/* length for accounting */
-	cli_write_func		cb;		/* callback */
-	void			*cb_data;	/* data passed to cb */
-	struct client		*cb_cli;	/* cli passed to cb */
-
-	struct list_head	node;
-};
+typedef bool (*cli_evt_func)(struct client *, unsigned int,
+			     bool *invalidate_cli);
 
 /* an open chunkd client */
 struct open_chunk {
@@ -165,11 +154,8 @@ struct client {
 	struct event		ev;
 	struct event		write_ev;
 
-	struct list_head	write_q;	/* list of async writes */
-	size_t			write_cnt;	/* water level */
-	bool			writing;
+	struct atcp_wr_state	wst;
 	/* some debugging stats */
-	size_t			write_cnt_max;
 
 	unsigned int		req_used;	/* amount of req_buf in use */
 	char			*req_ptr;	/* start of unexamined data */
@@ -216,12 +202,12 @@ enum st_net {
 };
 
 struct server_stats {
-	unsigned long		poll;		/* number polls */
-	unsigned long		event;		/* events dispatched */
-	unsigned long		tcp_accept;	/* TCP accepted cxns */
-	unsigned long		opt_write;	/* optimistic writes */
+	uint64_t		poll;		/* number polls */
+	uint64_t		event;		/* events dispatched */
+	uint64_t		tcp_accept;	/* TCP accepted cxns */
+	uint64_t		opt_write;	/* optimistic writes */
 
-	unsigned long		max_write_buf;
+	uint64_t		max_write_buf;
 };
 
 #define DBID_NONE      0
@@ -249,7 +235,6 @@ struct server {
 	struct event_base	*evbase_main;
 	int			ev_pipe[2];
 	struct event		pevt;
-	struct list_head	write_compl_q;	/* list of done writes */
 	bool			mc_delay;
 	struct event		mc_timer;
 
@@ -361,7 +346,8 @@ extern bool object_put(struct client *cli, const char *user, const char *bucket,
 		const char *key, long content_len, bool expect_cont);
 extern bool object_get(struct client *cli, const char *user, const char *bucket,
                        const char *key, bool want_body);
-extern bool cli_evt_http_data_in(struct client *cli, unsigned int events);
+extern bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+				bool *invalidate_cli);
 extern void cli_out_end(struct client *cli);
 extern void cli_in_end(struct client *cli);
 
@@ -398,12 +384,6 @@ extern bool cli_err(struct client *cli, enum errcode code);
 extern bool cli_err_write(struct client *cli, char *hdr, char *content);
 extern bool cli_resp_xml(struct client *cli, int http_status, GList *content);
 extern bool cli_resp_html(struct client *cli, int http_status, GList *content);
-extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
-		     cli_write_func cb, void *cb_data);
-extern size_t cli_wqueued(struct client *cli);
-extern bool cli_cb_free(struct client *cli, void *cb_data, bool done);
-extern bool cli_write_start(struct client *cli);
-extern bool cli_write_run_compl(void);
 extern int cli_req_avail(struct client *cli);
 extern void applog(int prio, const char *fmt, ...);
 extern void cld_update_cb(void);
@@ -415,7 +395,8 @@ extern struct db_remote *tdb_find_remote_byname(const char *name);
 extern struct db_remote *tdb_find_remote_byid(int id);
 
 /* status.c */
-extern bool stat_evt_http_req(struct client *cli, unsigned int events);
+extern bool stat_evt_http_req(struct client *cli, unsigned int events,
+				bool *invalidate_cli);
 
 /* config.c */
 extern void read_config(void);
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux