This is step #1. Other steps in the process: 2) update server/atcp.c and itd/*.c in tandem, until they have matching TCP-write code. should be straightforward, as both are based on the same codebase (tabled). 3) move atcp to libhail 4) remove TCP-write code from tabled, itd 5) update atcp to support SSL, sendfile 6) update chunkd to support atcp (req. step #5) All this code bears the same lineage, so it shouldn't be too difficult. Also note, this is a first draft with embedded libevent dependencies. I agree w/ zaitcev that the goal should be to eliminate these. atcp is wonderfully generic at present; not even a GLib dependency, IIRC. server/Makefile.am | 1 server/atcp.c | 243 +++++++++++++++++++++++++++++++++++++++++++++++++++++ server/atcp.h | 90 +++++++++++++++++++ server/bucket.c | 8 - server/object.c | 56 ++++++------ server/server.c | 237 +++++---------------------------------------------- server/tabled.h | 37 +------- 7 files changed, 400 insertions(+), 272 deletions(-) diff --git a/server/Makefile.am b/server/Makefile.am index 5b53a0a..5e0abd5 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -4,6 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@ sbin_PROGRAMS = tabled tdbadm tabled_SOURCES = tabled.h \ + atcp.c atcp.h \ bucket.c cldu.c config.c metarep.c object.c replica.c \ server.c status.c storage.c storparse.c util.c tabled_LDADD = ../lib/libtdb.a \ diff --git a/server/atcp.c b/server/atcp.c new file mode 100644 index 0000000..dac5b91 --- /dev/null +++ b/server/atcp.c @@ -0,0 +1,243 @@ + +/* + * Copyright 2010 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#define _GNU_SOURCE +#include "tabled-config.h" + +#include <string.h> +#include <stdlib.h> +#include <errno.h> +#include <sys/uio.h> +#include "atcp.h" + +bool atcp_cb_free(void *cb_data1, void *cb_data2, bool done) +{ + /* in typical usage, cb_data1 is the owner of cb_data2, + and has a longer lifetime. Therefore, by convention, + cb_data2 is the buffer to be released. + */ + free(cb_data2); + return false; +} + +static void atcp_write_complete(struct atcp_write *tmp) +{ + struct atcp_wr_state *wst = tmp->wst; + + list_del(&tmp->node); + list_add_tail(&tmp->node, &wst->write_compl_q); +} + +static bool atcp_write_free(struct atcp_write *tmp, bool done) +{ + struct atcp_wr_state *wst = tmp->wst; + bool rcb = false; + + wst->write_cnt -= tmp->length; + list_del_init(&tmp->node); + if (tmp->cb) + rcb = tmp->cb(tmp->cb_data1, tmp->cb_data2, done); + free(tmp); + + return rcb; +} + +bool atcp_write_run_compl(struct atcp_wr_state *wst) +{ + struct atcp_write *wr; + bool do_loop; + + do_loop = false; + while (!list_empty(&wst->write_compl_q)) { + wr = list_entry(wst->write_compl_q.next, + struct atcp_write, node); + do_loop |= atcp_write_free(wr, true); + } + return do_loop; +} + +void atcp_write_free_all(struct atcp_wr_state *wst) +{ + struct atcp_write *wr, *tmp; + + atcp_write_run_compl(wst); + list_for_each_entry_safe(wr, tmp, &wst->write_q, node) { + atcp_write_free(wr, false); + } +} + +size_t atcp_wqueued(struct atcp_wr_state *wst) +{ + return wst->write_cnt; +} + +static bool atcp_writable(struct atcp_wr_state *wst) +{ + int n_iov; + struct atcp_write *tmp; + ssize_t rc; + struct iovec iov[ATCP_MAX_WR_IOV]; + + /* accumulate pending writes into iovec */ + n_iov = 0; + list_for_each_entry(tmp, &wst->write_q, node) { + if (n_iov == ATCP_MAX_WR_IOV) + break; + /* bleh, struct iovec should declare iov_base const */ + iov[n_iov].iov_base = (void *) tmp->buf; + iov[n_iov].iov_len = tmp->togo; + n_iov++; + } + + /* execute non-blocking write */ +do_write: + rc = writev(wst->fd, iov, n_iov); + if (rc < 0) { + if (errno == EINTR) + goto do_write; + if (errno != EAGAIN) + goto err_out; + return true; + } + + /* iterate through write queue, issuing completions based on + * amount of data written + */ + while (rc > 0) { + int sz; + + /* get pointer to first record on list */ + tmp = list_entry(wst->write_q.next, struct atcp_write, node); + + /* mark data consumed by decreasing tmp->len */ + sz = (tmp->togo < rc) ? tmp->togo : rc; + tmp->togo -= sz; + tmp->buf += sz; + rc -= sz; + + /* if tmp->len reaches zero, write is complete, + * so schedule it for clean up (cannot call callback + * right away or an endless recursion will result) + */ + if (tmp->togo == 0) + atcp_write_complete(tmp); + } + + /* if we emptied the queue, clear write notification */ + if (list_empty(&wst->write_q)) { + wst->writing = false; + if (event_del(&wst->write_ev) < 0) + goto err_out; + } + + return true; + +err_out: + atcp_write_free_all(wst); + return false; +} + +static void atcp_wr_event(int fd, short events, void *userdata) +{ + struct atcp_wr_state *wst = userdata; + + atcp_writable(wst); + atcp_write_run_compl(wst); +} + +void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd) +{ + wst->fd = fd; + + event_set(&wst->write_ev, fd, EV_WRITE | EV_PERSIST, + atcp_wr_event, wst); +} + +bool atcp_write_start(struct atcp_wr_state *wst) +{ + if (list_empty(&wst->write_q)) + return true; /* loop, not poll */ + + /* if write-poll already active, nothing further to do */ + if (wst->writing) + return false; /* poll wait */ + + /* attempt optimistic write, in hopes of avoiding poll, + * or at least refill the write buffers so as to not + * get -immediately- called again by the kernel + */ + atcp_writable(wst); + if (list_empty(&wst->write_q)) { + wst->opt_write++; + return true; /* loop, not poll */ + } + + if (event_add(&wst->write_ev, NULL) < 0) + return true; /* loop, not poll */ + + wst->writing = true; + + return false; /* poll wait */ +} + +int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen, + atcp_write_func cb, void *cb_data1, void *cb_data2) +{ + struct atcp_write *wr; + + if (!buf || !buflen) + return -EINVAL; + + wr = calloc(1, sizeof(struct atcp_write)); + if (!wr) + return -ENOMEM; + + wr->buf = buf; + wr->togo = buflen; + wr->length = buflen; + wr->cb = cb; + wr->cb_data1 = cb_data1; + wr->cb_data2 = cb_data2; + wr->wst = wst; + list_add_tail(&wr->node, &wst->write_q); + wst->write_cnt += buflen; + if (wst->write_cnt > wst->write_cnt_max) + wst->write_cnt_max = wst->write_cnt; + + return 0; +} + +void atcp_wr_exit(struct atcp_wr_state *wst) +{ + if (!wst) + return; + + atcp_write_free_all(wst); +} + +void atcp_wr_init(struct atcp_wr_state *wst) +{ + memset(wst, 0, sizeof(*wst)); + + INIT_LIST_HEAD(&wst->write_q); + INIT_LIST_HEAD(&wst->write_compl_q); + + wst->fd = -1; +} + diff --git a/server/atcp.h b/server/atcp.h new file mode 100644 index 0000000..996a59e --- /dev/null +++ b/server/atcp.h @@ -0,0 +1,90 @@ +#ifndef __ATCP_H__ +#define __ATCP_H__ + +/* + * Copyright 2010 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include <stdint.h> +#include <stdbool.h> +#include <event.h> +#include <elist.h> + +enum { + ATCP_MAX_WR_IOV = 32, /* max iov per writev(2) */ +}; + +struct atcp_wr_state { + int fd; /* our socket */ + + bool writing; /* actively trying to write? */ + + size_t write_cnt; /* water level */ + size_t write_cnt_max; + + struct list_head write_q; /* list of async writes */ + struct list_head write_compl_q; /* list of done writes */ + + struct event write_ev; + + /* various statistics */ + uint64_t opt_write; /* optimistic writes */ +}; + +typedef bool (*atcp_write_func)(void *, void *, bool); + +struct atcp_write { + const void *buf; /* write buffer pointer */ + int togo; /* write buffer remainder */ + + int length; /* length for accounting */ + atcp_write_func cb; /* callback */ + void *cb_data1; /* first data passed to cb */ + void *cb_data2; /* second data passed to cb */ + + struct atcp_wr_state *wst; /* our parent */ + + struct list_head node; /* write_[compl_]q list node */ +}; + +/* setup and teardown atcp write state */ +extern void atcp_wr_exit(struct atcp_wr_state *wst); +extern void atcp_wr_init(struct atcp_wr_state *wst); + +/* generic write callback, that call free(cb_data2) */ +extern bool atcp_cb_free(void *cb_data1, void *cb_data2, bool done); + +/* clear all write queues immediately, even if not complete */ +extern void atcp_write_free_all(struct atcp_wr_state *wst); + +/* complete all writes found on completion queue */ +extern bool atcp_write_run_compl(struct atcp_wr_state *wst); + +/* total number of octets queued at this moment */ +extern size_t atcp_wqueued(struct atcp_wr_state *wst); + +/* initialize internal fd, event setup */ +extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd); + +/* add a buffer to the write queue */ +extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen, + atcp_write_func cb, void *cb_data1, void *cb_data2); + +/* begin pushing write queue to socket */ +extern bool atcp_write_start(struct atcp_wr_state *wst); + +#endif /* __ATCP_H__ */ diff --git a/server/bucket.c b/server/bucket.c index eb03e03..982ed62 100644 --- a/server/bucket.c +++ b/server/bucket.c @@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket) bucket) < 0) return cli_err(cli, InternalError); - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); return true; } - return cli_write_start(cli); + return atcp_write_start(&cli->wst); err_out: rc = txn->abort(txn); @@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket) hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0) return cli_err(cli, InternalError); - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); return true; } - return cli_write_start(cli); + return atcp_write_start(&cli->wst); err_out: rc = txn->abort(txn); diff --git a/server/object.c b/server/object.c index 3801e94..b053ed9 100644 --- a/server/object.c +++ b/server/object.c @@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user, hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0) return cli_err(cli, InternalError); - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); return true; } - return cli_write_start(cli); + return atcp_write_start(&cli->wst); err_out: rc = txn->abort(txn); @@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli) return cli_err(cli, InternalError); } - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); return true; } - return cli_write_start(cli); + return atcp_write_start(&cli->wst); err_out_rb: rc = txn->abort(txn); @@ -812,8 +812,8 @@ static bool object_put_body(struct client *cli, const char *user, cli_out_end(cli); return cli_err(cli, InternalError); } - cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont); - cli_write_start(cli); + atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cli, cont); + atcp_write_start(&cli->wst); } avail = MIN(cli_req_avail(cli), content_len); @@ -940,13 +940,13 @@ static bool object_put_acls(struct client *cli, const char *user, return cli_err(cli, InternalError); } - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); return true; } - return cli_write_start(cli); + return atcp_write_start(&cli->wst); err_out_rb: rc = txn->abort(txn); @@ -990,10 +990,10 @@ void cli_in_end(struct client *cli) cli->in_len = 0; } -static bool object_get_more(struct client *cli, void *cb_data, bool done); +static bool object_get_more(void *, void *, bool); /* - * Return true iff cli_writeq was called. This is compatible with the + * Return true iff atcp_writeq was called. This is compatible with the * convention for cli continuation callbacks, so object_get_more can call us. */ static bool object_get_poke(struct client *cli) @@ -1026,7 +1026,7 @@ static bool object_get_poke(struct client *cli) if (bytes == 0) { if (!cli->in_len) { cli_in_end(cli); - cli_write_start(cli); + atcp_write_start(&cli->wst); } free(buf); return false; @@ -1034,15 +1034,15 @@ static bool object_get_poke(struct client *cli) cli->in_len -= bytes; if (!cli->in_len) { - if (cli_writeq(cli, buf, bytes, cli_cb_free, buf)) + if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, cli, buf)) goto err_out; cli_in_end(cli); - cli_write_start(cli); + atcp_write_start(&cli->wst); } else { - if (cli_writeq(cli, buf, bytes, object_get_more, buf)) + if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, cli, buf)) goto err_out; - if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ) - cli_write_start(cli); + if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ) + atcp_write_start(&cli->wst); } return true; @@ -1053,11 +1053,12 @@ err_out: } /* callback from the client side: a queued write is being disposed */ -static bool object_get_more(struct client *cli, void *cb_data, bool done) +static bool object_get_more(void *cb_data1, void *cb_data2, bool done) { + struct client *cli = cb_data1; /* free now-written buffer */ - free(cb_data); + free(cb_data2); /* do not queue more, if !completion or fd was closed early */ if (!done) /* FIXME We used to test for input errors here. */ @@ -1071,8 +1072,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done) /* callback from the chunkd side: some data is available */ static void object_get_event(struct open_chunk *ochunk) { - object_get_poke(ochunk->cli); - cli_write_run_compl(); + struct client *cli = ochunk->cli; + + object_get_poke(cli); + atcp_write_run_compl(&cli->wst); } static int object_node_count_up(struct db_obj_ent *obj) @@ -1327,7 +1330,7 @@ static bool object_get_body(struct client *cli, const char *user, if (!want_body) { cli_in_end(cli); - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); return true; @@ -1347,7 +1350,7 @@ static bool object_get_body(struct client *cli, const char *user, if (!cli->in_len) cli_in_end(cli); - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); goto err_out_in_end; @@ -1365,21 +1368,22 @@ static bool object_get_body(struct client *cli, const char *user, goto err_out_in_end; memcpy(tmp, buf, bytes); - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); free(tmp); return true; } - if (cli_writeq(cli, tmp, bytes, - cli->in_len ? object_get_more : cli_cb_free, tmp)) + if (atcp_writeq(&cli->wst, tmp, bytes, + cli->in_len ? object_get_more : atcp_cb_free, + cli, tmp)) goto err_out_in_end; start_write: free(obj); g_string_free(extra_hdr, TRUE); - return cli_write_start(cli); + return atcp_write_start(&cli->wst); err_out_in_end: cli_in_end(cli); diff --git a/server/server.c b/server/server.c index 7a9fb7a..f8c4540 100644 --- a/server/server.c +++ b/server/server.c @@ -56,8 +56,6 @@ const char *argp_program_version = PACKAGE_VERSION; enum { - CLI_MAX_WR_IOV = 32, /* max iov per writev(2) */ - SFL_FOREGROUND = (1 << 0), /* run in foreground */ }; @@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content) if (asprintf(&str, "<p>Stats: " - "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n" - "<p>Debug: max_write_buf %lu</p>\r\n", - tabled_srv.stats.poll, - tabled_srv.stats.event, - tabled_srv.stats.tcp_accept, - tabled_srv.stats.opt_write, - tabled_srv.stats.max_write_buf) < 0) + "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n" + "<p>Debug: max_write_buf %llu</p>\r\n", + (unsigned long long) tabled_srv.stats.poll, + (unsigned long long) tabled_srv.stats.event, + (unsigned long long) tabled_srv.stats.tcp_accept, + (unsigned long long) tabled_srv.stats.opt_write, + (unsigned long long) tabled_srv.stats.max_write_buf) < 0) return false; content = g_list_append(content, str); return true; } -static void cli_write_complete(struct client *cli, struct client_write *tmp) -{ - list_del(&tmp->node); - list_add_tail(&tmp->node, &tabled_srv.write_compl_q); -} - -static bool cli_write_free(struct client_write *tmp, bool done) -{ - struct client *cli = tmp->cb_cli; - bool rcb = false; - - cli->write_cnt -= tmp->length; - list_del(&tmp->node); - if (tmp->cb) - rcb = tmp->cb(cli, tmp->cb_data, done); - free(tmp); - - return rcb; -} - -static void cli_write_free_all(struct client *cli) -{ - struct client_write *wr, *tmp; - - cli_write_run_compl(); - list_for_each_entry_safe(wr, tmp, &cli->write_q, node) { - cli_write_free(wr, false); - } -} - -bool cli_write_run_compl(void) -{ - struct client_write *wr; - bool do_loop; - - do_loop = false; - while (!list_empty(&tabled_srv.write_compl_q)) { - wr = list_entry(tabled_srv.write_compl_q.next, - struct client_write, node); - do_loop |= cli_write_free(wr, true); - } - return do_loop; -} - static void cli_free(struct client *cli) { - cli_write_free_all(cli); + if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf) + tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max; + tabled_srv.stats.opt_write += cli->wst.opt_write; + + atcp_wr_exit(&cli->wst); cli_out_end(cli); cli_in_end(cli); @@ -561,9 +519,6 @@ static void cli_free(struct client *cli) hreq_free(&cli->req); - if (cli->write_cnt_max > tabled_srv.stats.max_write_buf) - tabled_srv.stats.max_write_buf = cli->write_cnt_max; - if (debugging) applog(LOG_INFO, "client %s ended", cli->addr_host); @@ -575,7 +530,7 @@ static bool cli_evt_dispose(struct client *cli, unsigned int events) /* if write queue is not empty, we should continue to get * poll callbacks here until it is */ - if (list_empty(&cli->write_q)) + if (list_empty(&cli->wst.write_q)) cli_free(cli); return false; @@ -608,134 +563,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events) return true; } -static void cli_writable(struct client *cli) -{ - int n_iov; - struct client_write *tmp; - ssize_t rc; - struct iovec iov[CLI_MAX_WR_IOV]; - - /* accumulate pending writes into iovec */ - n_iov = 0; - list_for_each_entry(tmp, &cli->write_q, node) { - if (n_iov == CLI_MAX_WR_IOV) - break; - /* bleh, struct iovec should declare iov_base const */ - iov[n_iov].iov_base = (void *) tmp->buf; - iov[n_iov].iov_len = tmp->togo; - n_iov++; - } - - /* execute non-blocking write */ -do_write: - rc = writev(cli->fd, iov, n_iov); - if (rc < 0) { - if (errno == EINTR) - goto do_write; - if (errno != EAGAIN) - goto err_out; - return; - } - - /* iterate through write queue, issuing completions based on - * amount of data written - */ - while (rc > 0) { - int sz; - - /* get pointer to first record on list */ - tmp = list_entry(cli->write_q.next, struct client_write, node); - - /* mark data consumed by decreasing tmp->len */ - sz = (tmp->togo < rc) ? tmp->togo : rc; - tmp->togo -= sz; - tmp->buf += sz; - rc -= sz; - - /* if tmp->len reaches zero, write is complete, - * so schedule it for clean up (cannot call callback - * right away or an endless recursion will result) - */ - if (tmp->togo == 0) - cli_write_complete(cli, tmp); - } - - /* if we emptied the queue, clear write notification */ - if (list_empty(&cli->write_q)) { - cli->writing = false; - if (event_del(&cli->write_ev) < 0) { - applog(LOG_WARNING, "cli_writable event_del"); - goto err_out; - } - } - - return; - -err_out: - cli->state = evt_dispose; - cli_write_free_all(cli); -} - -bool cli_write_start(struct client *cli) -{ - if (list_empty(&cli->write_q)) - return true; /* loop, not poll */ - - /* if write-poll already active, nothing further to do */ - if (cli->writing) - return false; /* poll wait */ - - /* attempt optimistic write, in hopes of avoiding poll, - * or at least refill the write buffers so as to not - * get -immediately- called again by the kernel - */ - cli_writable(cli); - if (list_empty(&cli->write_q)) { - tabled_srv.stats.opt_write++; - return true; /* loop, not poll */ - } - - if (event_add(&cli->write_ev, NULL) < 0) { - applog(LOG_WARNING, "cli_write event_add"); - return true; /* loop, not poll */ - } - - cli->writing = true; - - return false; /* poll wait */ -} - -int cli_writeq(struct client *cli, const void *buf, unsigned int buflen, - cli_write_func cb, void *cb_data) -{ - struct client_write *wr; - - if (!buf || !buflen) - return -EINVAL; - - wr = calloc(1, sizeof(struct client_write)); - if (!wr) - return -ENOMEM; - - wr->buf = buf; - wr->togo = buflen; - wr->length = buflen; - wr->cb = cb; - wr->cb_data = cb_data; - wr->cb_cli = cli; - list_add_tail(&wr->node, &cli->write_q); - cli->write_cnt += buflen; - if (cli->write_cnt > cli->write_cnt_max) - cli->write_cnt_max = cli->write_cnt; - - return 0; -} - -size_t cli_wqueued(struct client *cli) -{ - return cli->write_cnt; -} - /* * Return: * 0: progress was NOT made (EOF) @@ -771,12 +598,6 @@ do_read: return rc != 0; } -bool cli_cb_free(struct client *cli, void *cb_data, bool done) -{ - free(cb_data); - return false; -} - static int cli_write_list(struct client *cli, GList *list) { int rc = 0; @@ -784,8 +605,8 @@ static int cli_write_list(struct client *cli, GList *list) tmp = list; while (tmp) { - rc = cli_writeq(cli, tmp->data, strlen(tmp->data), - cli_cb_free, tmp->data); + rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data), + atcp_cb_free, cli, tmp->data); if (rc) goto out; @@ -870,14 +691,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content) cli->state = evt_dispose; - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) return true; - rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content); + rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, cli, content); if (rc) return true; - return cli_write_start(cli); + return atcp_write_start(&cli->wst); } static bool cli_resp(struct client *cli, int http_status, @@ -911,7 +732,7 @@ static bool cli_resp(struct client *cli, int http_status, else cli->state = evt_recycle; - rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr); + rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr); if (rc) { free(hdr); cli->state = evt_dispose; @@ -924,7 +745,7 @@ static bool cli_resp(struct client *cli, int http_status, return true; } - rcb = cli_write_start(cli); + rcb = atcp_write_start(&cli->wst); if (cli->state == evt_recycle) return true; @@ -1385,9 +1206,10 @@ static struct client *cli_alloc(bool is_status) return NULL; } + atcp_wr_init(&cli->wst); + cli->state = evt_read_req; cli->evt_table = is_status? evt_funcs_status: evt_funcs_server; - INIT_LIST_HEAD(&cli->write_q); INIT_LIST_HEAD(&cli->out_ch); cli->req_ptr = cli->req_buf; memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr)); @@ -1395,14 +1217,6 @@ static struct client *cli_alloc(bool is_status) return cli; } -static void tcp_cli_wr_event(int fd, short events, void *userdata) -{ - struct client *cli = userdata; - - cli_writable(cli); - cli_write_run_compl(); -} - static void tcp_cli_event(int fd, short events, void *userdata) { struct client *cli = userdata; @@ -1410,7 +1224,7 @@ static void tcp_cli_event(int fd, short events, void *userdata) do { loop = cli->evt_table[cli->state](cli, events); - loop |= cli_write_run_compl(); + loop |= atcp_write_run_compl(&cli->wst); } while (loop); } @@ -1438,11 +1252,11 @@ static void tcp_srv_event(int fd, short events, void *userdata) goto err_out; } + atcp_wr_set_fd(&cli->wst, cli->fd); + tabled_srv.stats.tcp_accept++; event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli); - event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST, - tcp_cli_wr_event, cli); /* mark non-blocking, for upcoming poll use */ if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0) @@ -2202,7 +2016,6 @@ int main (int argc, char *argv[]) struct event_base *event_base_rep; INIT_LIST_HEAD(&tabled_srv.all_stor); - INIT_LIST_HEAD(&tabled_srv.write_compl_q); tabled_srv.state_tdb = ST_TDB_INIT; tabled_srv.rep_next_id = DBID_MIN; diff --git a/server/tabled.h b/server/tabled.h index d4d2048..be6c9b4 100644 --- a/server/tabled.h +++ b/server/tabled.h @@ -31,6 +31,7 @@ #include <elist.h> #include <tdb.h> #include <hail_log.h> +#include "atcp.h" #ifndef ARRAY_SIZE #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0])) @@ -103,19 +104,6 @@ struct storage_node { }; typedef bool (*cli_evt_func)(struct client *, unsigned int); -typedef bool (*cli_write_func)(struct client *, void *, bool); - -struct client_write { - const void *buf; /* write buffer pointer */ - int togo; /* write buffer remainder */ - - int length; /* length for accounting */ - cli_write_func cb; /* callback */ - void *cb_data; /* data passed to cb */ - struct client *cb_cli; /* cli passed to cb */ - - struct list_head node; -}; /* an open chunkd client */ struct open_chunk { @@ -163,13 +151,9 @@ struct client { int fd; /* socket */ bool ev_active; struct event ev; - struct event write_ev; - struct list_head write_q; /* list of async writes */ - size_t write_cnt; /* water level */ - bool writing; + struct atcp_wr_state wst; /* some debugging stats */ - size_t write_cnt_max; unsigned int req_used; /* amount of req_buf in use */ char *req_ptr; /* start of unexamined data */ @@ -216,12 +200,12 @@ enum st_net { }; struct server_stats { - unsigned long poll; /* number polls */ - unsigned long event; /* events dispatched */ - unsigned long tcp_accept; /* TCP accepted cxns */ - unsigned long opt_write; /* optimistic writes */ + uint64_t poll; /* number polls */ + uint64_t event; /* events dispatched */ + uint64_t tcp_accept; /* TCP accepted cxns */ + uint64_t opt_write; /* optimistic writes */ - unsigned long max_write_buf; + uint64_t max_write_buf; }; #define DBID_NONE 0 @@ -249,7 +233,6 @@ struct server { struct event_base *evbase_main; int ev_pipe[2]; struct event pevt; - struct list_head write_compl_q; /* list of done writes */ bool mc_delay; struct event mc_timer; @@ -398,12 +381,6 @@ extern bool cli_err(struct client *cli, enum errcode code); extern bool cli_err_write(struct client *cli, char *hdr, char *content); extern bool cli_resp_xml(struct client *cli, int http_status, GList *content); extern bool cli_resp_html(struct client *cli, int http_status, GList *content); -extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen, - cli_write_func cb, void *cb_data); -extern size_t cli_wqueued(struct client *cli); -extern bool cli_cb_free(struct client *cli, void *cb_data, bool done); -extern bool cli_write_start(struct client *cli); -extern bool cli_write_run_compl(void); extern int cli_req_avail(struct client *cli); extern void applog(int prio, const char *fmt, ...); extern void cld_update_cb(void); -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html