On 05/01/2010 10:56 PM, Pete Zaitcev wrote:
On Sat, 01 May 2010 18:28:42 -0400
Jeff Garzik<jeff@xxxxxxxxxx> wrote:
As I write this email, I am borrowing a lot of networking code from
tabled, to convert from GNet over to the more-flexible TCP server
codebase found in tabled -- notably the asynchronous background TCP
writing code in tabled. Hopefully will finish and commit this by the
end of the weekend.
This seems crying for a common repository or something like libhail,
not sure what. Remember the timer case. Eventually we'll make changes
to tabled that itd will need to copy. But I don't know what course
is best.
I was definitely thinking along those lines, when I abstracted and
modularized the code a bit. See attached... I put all the TCP
write-related code into a two structures, tcp_write_state and tcp_write.
The code received s/cli_wr/tcp_wr/g and other obvious, cosmetic changes.
libhail definitely seems like the direction to go. It would be easiest
from a packaging perspective to put it into CLD. But maybe it deserves
its own repo.?
Jeff
====================================SNIP CUT HERE SNIP=========================
enum {
TCP_MAX_WR_IOV = 512, /* arbitrary, pick better one */
TCP_MAX_WR_CNT = 10000,/* arbitrary, pick better one */
};
struct tcp_write_state {
int fd;
struct list_head write_q;
struct list_head write_compl_q;
size_t write_cnt; /* water level */
size_t write_cnt_max;
bool writing;
struct event write_ev;
void *priv; /* useable by any app */
/* stats */
unsigned long opt_write;
};
struct tcp_write {
const void *buf; /* write buffer pointer */
int togo; /* write buffer remainder */
int length; /* length for accounting */
/* callback */
bool (*cb)(struct tcp_write_state *, void *, bool);
void *cb_data; /* data passed to cb */
struct list_head node;
};
extern int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen,
bool (*cb)(struct tcp_write_state *, void *, bool),
void *cb_data);
extern bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done);
extern void tcp_write_init(struct tcp_write_state *st, int fd);
extern void tcp_write_exit(struct tcp_write_state *st);
extern bool tcp_write_start(struct tcp_write_state *st);
====================================SNIP CUT HERE SNIP=========================
static void tcp_write_complete(struct tcp_write_state *st, struct tcp_write *tmp)
{
list_del(&tmp->node);
list_add_tail(&tmp->node, &st->write_compl_q);
}
bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done)
{
free(cb_data);
return false;
}
static bool tcp_write_free(struct tcp_write_state *st, struct tcp_write *tmp,
bool done)
{
bool rcb = false;
st->write_cnt -= tmp->length;
list_del(&tmp->node);
if (tmp->cb)
rcb = tmp->cb(st, tmp->cb_data, done);
free(tmp);
return rcb;
}
static void tcp_write_free_all(struct tcp_write_state *st)
{
struct tcp_write *wr, *tmp;
list_for_each_entry_safe(wr, tmp, &st->write_compl_q, node) {
tcp_write_free(st, wr, true);
}
list_for_each_entry_safe(wr, tmp, &st->write_q, node) {
tcp_write_free(st, wr, false);
}
}
bool tcp_write_run_compl(struct tcp_write_state *st)
{
struct tcp_write *wr;
bool do_loop;
do_loop = false;
while (!list_empty(&st->write_compl_q)) {
wr = list_entry(st->write_compl_q.next, struct tcp_write,
node);
do_loop |= tcp_write_free(st, wr, true);
}
return do_loop;
}
static bool tcp_writable(struct tcp_write_state *st)
{
int n_iov;
struct tcp_write *tmp;
ssize_t rc;
struct iovec iov[TCP_MAX_WR_IOV];
/* accumulate pending writes into iovec */
n_iov = 0;
list_for_each_entry(tmp, &st->write_q, node) {
if (n_iov == TCP_MAX_WR_IOV)
break;
/* bleh, struct iovec should declare iov_base const */
iov[n_iov].iov_base = (void *) tmp->buf;
iov[n_iov].iov_len = tmp->togo;
n_iov++;
}
/* execute non-blocking write */
do_write:
rc = writev(st->fd, iov, n_iov);
if (rc < 0) {
if (errno == EINTR)
goto do_write;
if (errno != EAGAIN)
goto err_out;
return true;
}
/* iterate through write queue, issuing completions based on
* amount of data written
*/
while (rc > 0) {
int sz;
/* get pointer to first record on list */
tmp = list_entry(st->write_q.next, struct tcp_write, node);
/* mark data consumed by decreasing tmp->len */
sz = (tmp->togo < rc) ? tmp->togo : rc;
tmp->togo -= sz;
tmp->buf += sz;
rc -= sz;
/* if tmp->len reaches zero, write is complete,
* so schedule it for clean up (cannot call callback
* right away or an endless recursion will result)
*/
if (tmp->togo == 0)
tcp_write_complete(st, tmp);
}
/* if we emptied the queue, clear write notification */
if (list_empty(&st->write_q)) {
st->writing = false;
if (event_del(&st->write_ev) < 0)
goto err_out;
}
return true;
err_out:
tcp_write_free_all(st);
return false;
}
bool tcp_write_start(struct tcp_write_state *st)
{
if (list_empty(&st->write_q))
return true; /* loop, not poll */
/* if write-poll already active, nothing further to do */
if (st->writing)
return false; /* poll wait */
/* attempt optimistic write, in hopes of avoiding poll,
* or at least refill the write buffers so as to not
* get -immediately- called again by the kernel
*/
tcp_writable(st);
if (list_empty(&st->write_q)) {
st->opt_write++;
return true; /* loop, not poll */
}
if (event_add(&st->write_ev, NULL) < 0)
return true; /* loop, not poll */
st->writing = true;
return false; /* poll wait */
}
int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen,
bool (*cb)(struct tcp_write_state *, void *, bool),
void *cb_data)
{
struct tcp_write *wr;
if (!buf || !buflen)
return -EINVAL;
wr = calloc(1, sizeof(struct tcp_write));
if (!wr)
return -ENOMEM;
wr->buf = buf;
wr->togo = buflen;
wr->length = buflen;
wr->cb = cb;
wr->cb_data = cb_data;
list_add_tail(&wr->node, &st->write_q);
st->write_cnt += buflen;
if (st->write_cnt > st->write_cnt_max)
st->write_cnt_max = st->write_cnt;
return 0;
}
size_t tcp_wqueued(struct tcp_write_state *st)
{
return st->write_cnt;
}
static void tcp_wr_evt(int fd, short events, void *userdata)
{
struct tcp_write_state *st = userdata;
tcp_writable(st);
}
void tcp_write_init(struct tcp_write_state *st, int fd)
{
memset(st, 0, sizeof(*st));
st->fd = fd;
INIT_LIST_HEAD(&st->write_q);
INIT_LIST_HEAD(&st->write_compl_q);
st->write_cnt_max = TCP_MAX_WR_CNT;
event_set(&st->write_ev, fd, EV_WRITE | EV_PERSIST,
tcp_wr_evt, st);
}
void tcp_write_exit(struct tcp_write_state *st)
{
if (st->writing)
event_del(&st->write_ev);
tcp_write_free_all(st);
}
====================================SNIP CUT HERE SNIP=========================