Just committed into libhail... renamed the include to 'anet.h' for 'asynchronous networking'. include/Makefile.am | 2 include/anet.h | 111 +++++++++++++++++++++++ lib/Makefile.am | 1 lib/atcp.c | 241 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 354 insertions(+), 1 deletion(-) commit 22de683a8f0566852818fac8b54ca26ae46490f0 Author: Jeff Garzik <jeff@xxxxxxxxxx> Date: Thu Sep 23 20:17:56 2010 -0400 libhail: add async TCP network writing API, atcp_wr* Signed-off-by: Jeff Garzik <jgarzik@xxxxxxxxxx> diff --git a/include/Makefile.am b/include/Makefile.am index 234cf8a..967352a 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -5,5 +5,5 @@ EXTRA_DIST = \ include_HEADERS = \ cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h \ - hail_log.h hstor.h + hail_log.h hstor.h anet.h diff --git a/include/anet.h b/include/anet.h new file mode 100644 index 0000000..5c216c7 --- /dev/null +++ b/include/anet.h @@ -0,0 +1,111 @@ +#ifndef __ANET_H__ +#define __ANET_H__ + +/* + * Copyright 2010 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include <stdint.h> +#include <stdbool.h> +#include <sys/time.h> +#include <elist.h> + +enum { + ATCP_MAX_WR_IOV = 32, /* max iov per writev(2) */ +}; + +typedef void (*atcp_ev_func)(int, short, void *); + +struct atcp_wr_ops { + int (*ev_wset)(void *, int, atcp_ev_func, void *); + int (*ev_add)(void *, const struct timeval *); + int (*ev_del)(void *); +}; + +struct atcp_wr_state { + int fd; /* our socket */ + + bool writing; /* actively trying to write? */ + + size_t write_cnt; /* water level */ + size_t write_cnt_max; + + struct list_head write_q; /* list of async writes */ + struct list_head write_compl_q; /* list of done writes */ + + void *priv; /* untouched by atcp */ + + /* various statistics */ + uint64_t opt_write; /* optimistic writes */ + + const struct atcp_wr_ops *ops; + void *ev_info; /* passed to ops->ev_* */ +}; + +typedef bool (*atcp_write_func)(struct atcp_wr_state *, void *, bool); + +struct atcp_write { + const void *buf; /* write buffer pointer */ + int togo; /* write buffer remainder */ + + int length; /* length for accounting */ + atcp_write_func cb; /* callback */ + void *cb_data; /* data passed to cb */ + + struct atcp_wr_state *wst; /* our parent */ + + struct list_head node; /* write_[compl_]q list node */ +}; + +/* setup and teardown atcp write state */ +extern void atcp_wr_exit(struct atcp_wr_state *wst); +extern void atcp_wr_init(struct atcp_wr_state *wst, + const struct atcp_wr_ops *ops, void *ev_info, + void *priv); + +/* generic write callback, that call free(cb_data2) */ +extern bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done); + +/* clear all write queues immediately, even if not complete */ +extern void atcp_write_free_all(struct atcp_wr_state *wst); + +/* complete all writes found on completion queue */ +extern bool atcp_write_run_compl(struct atcp_wr_state *wst); + +/* initialize internal fd, event setup */ +extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd); + +/* add a buffer to the write queue */ +extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen, + atcp_write_func cb, void *cb_data); + +/* begin pushing write queue to socket */ +extern bool atcp_write_start(struct atcp_wr_state *wst); + +/* is anything on the write queue at the moment? */ +static inline bool atcp_wq_empty(struct atcp_wr_state *wst) +{ + return list_empty(&wst->write_q) ? true : false; +} + +/* total number of octets queued at this moment */ +static inline size_t atcp_wqueued(struct atcp_wr_state *wst) +{ + return wst->write_cnt; +} + +#endif /* __ANET_H__ */ diff --git a/lib/Makefile.am b/lib/Makefile.am index f7b27ff..616b881 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -21,6 +21,7 @@ LINK = $(LIBTOOL) --mode=link $(CC) $(CFLAGS) $(LDFLAGS) -o $@ lib_LTLIBRARIES = libhail.la libhail_la_SOURCES = \ + atcp.c \ cldc.c \ cldc-udp.c \ cldc-dns.c \ diff --git a/lib/atcp.c b/lib/atcp.c new file mode 100644 index 0000000..dfdb954 --- /dev/null +++ b/lib/atcp.c @@ -0,0 +1,241 @@ + +/* + * Copyright 2010 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include "hail-config.h" + +#include <string.h> +#include <stdlib.h> +#include <errno.h> +#include <sys/uio.h> +#include <anet.h> + +bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done) +{ + free(cb_data); + return false; +} + +static void atcp_write_complete(struct atcp_write *tmp) +{ + struct atcp_wr_state *wst = tmp->wst; + + list_del(&tmp->node); + list_add_tail(&tmp->node, &wst->write_compl_q); +} + +static bool atcp_write_free(struct atcp_write *tmp, bool done) +{ + struct atcp_wr_state *wst = tmp->wst; + bool rcb = false; + + wst->write_cnt -= tmp->length; + list_del_init(&tmp->node); + if (tmp->cb) + rcb = tmp->cb(wst, tmp->cb_data, done); + free(tmp); + + return rcb; +} + +bool atcp_write_run_compl(struct atcp_wr_state *wst) +{ + struct atcp_write *wr; + bool do_loop; + + do_loop = false; + while (!list_empty(&wst->write_compl_q)) { + wr = list_entry(wst->write_compl_q.next, + struct atcp_write, node); + do_loop |= atcp_write_free(wr, true); + } + return do_loop; +} + +void atcp_write_free_all(struct atcp_wr_state *wst) +{ + struct atcp_write *wr, *tmp; + + atcp_write_run_compl(wst); + list_for_each_entry_safe(wr, tmp, &wst->write_q, node) { + atcp_write_free(wr, false); + } +} + +static bool atcp_writable(struct atcp_wr_state *wst) +{ + int n_iov; + struct atcp_write *tmp; + ssize_t rc; + struct iovec iov[ATCP_MAX_WR_IOV]; + + /* accumulate pending writes into iovec */ + n_iov = 0; + list_for_each_entry(tmp, &wst->write_q, node) { + if (n_iov == ATCP_MAX_WR_IOV) + break; + /* bleh, struct iovec should declare iov_base const */ + iov[n_iov].iov_base = (void *) tmp->buf; + iov[n_iov].iov_len = tmp->togo; + n_iov++; + } + + /* execute non-blocking write */ +do_write: + rc = writev(wst->fd, iov, n_iov); + if (rc < 0) { + if (errno == EINTR) + goto do_write; + if (errno != EAGAIN) + goto err_out; + return true; + } + + /* iterate through write queue, issuing completions based on + * amount of data written + */ + while (rc > 0) { + int sz; + + /* get pointer to first record on list */ + tmp = list_entry(wst->write_q.next, struct atcp_write, node); + + /* mark data consumed by decreasing tmp->len */ + sz = (tmp->togo < rc) ? tmp->togo : rc; + tmp->togo -= sz; + tmp->buf += sz; + rc -= sz; + + /* if tmp->len reaches zero, write is complete, + * so schedule it for clean up (cannot call callback + * right away or an endless recursion will result) + */ + if (tmp->togo == 0) + atcp_write_complete(tmp); + } + + /* if we emptied the queue, clear write notification */ + if (atcp_wq_empty(wst)) { + wst->writing = false; + if (wst->ops->ev_del(wst->ev_info) < 0) + goto err_out; + } + + return true; + +err_out: + atcp_write_free_all(wst); + return false; +} + +static void atcp_wr_event(int fd, short events, void *userdata) +{ + struct atcp_wr_state *wst = userdata; + + atcp_writable(wst); + atcp_write_run_compl(wst); +} + +void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd) +{ + wst->fd = fd; + + wst->ops->ev_wset(wst->ev_info, wst->fd, + atcp_wr_event, wst); +} + +bool atcp_write_start(struct atcp_wr_state *wst) +{ + if (atcp_wq_empty(wst)) + return true; /* loop, not poll */ + + /* if write-poll already active, nothing further to do */ + if (wst->writing) + return false; /* poll wait */ + + /* attempt optimistic write, in hopes of avoiding poll, + * or at least refill the write buffers so as to not + * get -immediately- called again by the kernel + */ + atcp_writable(wst); + if (atcp_wq_empty(wst)) { + wst->opt_write++; + return true; /* loop, not poll */ + } + + if (wst->ops->ev_add(wst->ev_info, NULL) < 0) + return true; /* loop, not poll */ + + wst->writing = true; + + return false; /* poll wait */ +} + +int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen, + atcp_write_func cb, void *cb_data) +{ + struct atcp_write *wr; + + if (!buf || !buflen) + return -EINVAL; + + wr = calloc(1, sizeof(struct atcp_write)); + if (!wr) + return -ENOMEM; + + wr->buf = buf; + wr->togo = buflen; + wr->length = buflen; + wr->cb = cb; + wr->cb_data = cb_data; + wr->wst = wst; + list_add_tail(&wr->node, &wst->write_q); + wst->write_cnt += buflen; + if (wst->write_cnt > wst->write_cnt_max) + wst->write_cnt_max = wst->write_cnt; + + return 0; +} + +void atcp_wr_exit(struct atcp_wr_state *wst) +{ + if (!wst) + return; + + if (wst->writing) + wst->ops->ev_del(wst->ev_info); + + atcp_write_free_all(wst); +} + +void atcp_wr_init(struct atcp_wr_state *wst, + const struct atcp_wr_ops *ops, void *ev_info, + void *priv) +{ + memset(wst, 0, sizeof(*wst)); + + INIT_LIST_HEAD(&wst->write_q); + INIT_LIST_HEAD(&wst->write_compl_q); + + wst->fd = -1; + + wst->ops = ops; + wst->ev_info = ev_info; + wst->priv = priv; +} + -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html