Re: iSCSI front-end for Hail

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 05/01/2010 10:56 PM, Pete Zaitcev wrote:
On Sat, 01 May 2010 18:28:42 -0400
Jeff Garzik<jeff@xxxxxxxxxx>  wrote:

As I write this email, I am borrowing a lot of networking code from
tabled, to convert from GNet over to the more-flexible TCP server
codebase found in tabled -- notably the asynchronous background TCP
writing code in tabled.  Hopefully will finish and commit this by the
end of the weekend.

This seems crying for a common repository or something like libhail,
not sure what. Remember the timer case. Eventually we'll make changes
to tabled that itd will need to copy. But I don't know what course
is best.

I was definitely thinking along those lines, when I abstracted and modularized the code a bit. See attached... I put all the TCP write-related code into a two structures, tcp_write_state and tcp_write. The code received s/cli_wr/tcp_wr/g and other obvious, cosmetic changes.

libhail definitely seems like the direction to go. It would be easiest from a packaging perspective to put it into CLD. But maybe it deserves its own repo.?

	Jeff



====================================SNIP CUT HERE SNIP=========================

enum {
	TCP_MAX_WR_IOV		= 512,	/* arbitrary, pick better one */
	TCP_MAX_WR_CNT		= 10000,/* arbitrary, pick better one */
};

struct tcp_write_state {
	int 			fd;
	struct list_head	write_q;
	struct list_head	write_compl_q;
	size_t			write_cnt;	/* water level */
	size_t			write_cnt_max;
	bool			writing;
	struct event		write_ev;

	void			*priv;		/* useable by any app */

	/* stats */
	unsigned long		opt_write;
};

struct tcp_write {
	const void		*buf;		/* write buffer pointer */
	int			togo;		/* write buffer remainder */

	int			length;		/* length for accounting */

						/* callback */
	bool			(*cb)(struct tcp_write_state *, void *, bool);
	void			*cb_data;	/* data passed to cb */

	struct list_head	node;
};

extern int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen,
	       bool (*cb)(struct tcp_write_state *, void *, bool),
	       void *cb_data);
extern bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done);
extern void tcp_write_init(struct tcp_write_state *st, int fd);
extern void tcp_write_exit(struct tcp_write_state *st);
extern bool tcp_write_start(struct tcp_write_state *st);



====================================SNIP CUT HERE SNIP=========================



static void tcp_write_complete(struct tcp_write_state *st, struct tcp_write *tmp)
{
	list_del(&tmp->node);
	list_add_tail(&tmp->node, &st->write_compl_q);
}

bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done)
{
	free(cb_data);
	return false;
}

static bool tcp_write_free(struct tcp_write_state *st, struct tcp_write *tmp,
			   bool done)
{
	bool rcb = false;

	st->write_cnt -= tmp->length;
	list_del(&tmp->node);
	if (tmp->cb)
		rcb = tmp->cb(st, tmp->cb_data, done);
	free(tmp);

	return rcb;
}

static void tcp_write_free_all(struct tcp_write_state *st)
{
	struct tcp_write *wr, *tmp;

	list_for_each_entry_safe(wr, tmp, &st->write_compl_q, node) {
		tcp_write_free(st, wr, true);
	}
	list_for_each_entry_safe(wr, tmp, &st->write_q, node) {
		tcp_write_free(st, wr, false);
	}
}

bool tcp_write_run_compl(struct tcp_write_state *st)
{
	struct tcp_write *wr;
	bool do_loop;

	do_loop = false;
	while (!list_empty(&st->write_compl_q)) {
		wr = list_entry(st->write_compl_q.next, struct tcp_write,
				node);
		do_loop |= tcp_write_free(st, wr, true);
	}
	return do_loop;
}

static bool tcp_writable(struct tcp_write_state *st)
{
	int n_iov;
	struct tcp_write *tmp;
	ssize_t rc;
	struct iovec iov[TCP_MAX_WR_IOV];

	/* accumulate pending writes into iovec */
	n_iov = 0;
	list_for_each_entry(tmp, &st->write_q, node) {
		if (n_iov == TCP_MAX_WR_IOV)
			break;
		/* bleh, struct iovec should declare iov_base const */
		iov[n_iov].iov_base = (void *) tmp->buf;
		iov[n_iov].iov_len = tmp->togo;
		n_iov++;
	}

	/* execute non-blocking write */
do_write:
	rc = writev(st->fd, iov, n_iov);
	if (rc < 0) {
		if (errno == EINTR)
			goto do_write;
		if (errno != EAGAIN)
			goto err_out;
		return true;
	}

	/* iterate through write queue, issuing completions based on
	 * amount of data written
	 */
	while (rc > 0) {
		int sz;

		/* get pointer to first record on list */
		tmp = list_entry(st->write_q.next, struct tcp_write, node);

		/* mark data consumed by decreasing tmp->len */
		sz = (tmp->togo < rc) ? tmp->togo : rc;
		tmp->togo -= sz;
		tmp->buf += sz;
		rc -= sz;

		/* if tmp->len reaches zero, write is complete,
		 * so schedule it for clean up (cannot call callback
		 * right away or an endless recursion will result)
		 */
		if (tmp->togo == 0)
			tcp_write_complete(st, tmp);
	}

	/* if we emptied the queue, clear write notification */
	if (list_empty(&st->write_q)) {
		st->writing = false;
		if (event_del(&st->write_ev) < 0)
			goto err_out;
	}

	return true;

err_out:
	tcp_write_free_all(st);
	return false;
}

bool tcp_write_start(struct tcp_write_state *st)
{
	if (list_empty(&st->write_q))
		return true;		/* loop, not poll */

	/* if write-poll already active, nothing further to do */
	if (st->writing)
		return false;		/* poll wait */

	/* attempt optimistic write, in hopes of avoiding poll,
	 * or at least refill the write buffers so as to not
	 * get -immediately- called again by the kernel
	 */
	tcp_writable(st);
	if (list_empty(&st->write_q)) {
		st->opt_write++;
		return true;		/* loop, not poll */
	}

	if (event_add(&st->write_ev, NULL) < 0)
		return true;		/* loop, not poll */

	st->writing = true;

	return false;			/* poll wait */
}

int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen,
	       bool (*cb)(struct tcp_write_state *, void *, bool),
	       void *cb_data)
{
	struct tcp_write *wr;

	if (!buf || !buflen)
		return -EINVAL;

	wr = calloc(1, sizeof(struct tcp_write));
	if (!wr)
		return -ENOMEM;

	wr->buf = buf;
	wr->togo = buflen;
	wr->length = buflen;
	wr->cb = cb;
	wr->cb_data = cb_data;
	list_add_tail(&wr->node, &st->write_q);
	st->write_cnt += buflen;
	if (st->write_cnt > st->write_cnt_max)
		st->write_cnt_max = st->write_cnt;

	return 0;
}

size_t tcp_wqueued(struct tcp_write_state *st)
{
	return st->write_cnt;
}

static void tcp_wr_evt(int fd, short events, void *userdata)
{
	struct tcp_write_state *st = userdata;

	tcp_writable(st);
}

void tcp_write_init(struct tcp_write_state *st, int fd)
{
	memset(st, 0, sizeof(*st));

	st->fd = fd;

	INIT_LIST_HEAD(&st->write_q);
	INIT_LIST_HEAD(&st->write_compl_q);

	st->write_cnt_max = TCP_MAX_WR_CNT;

	event_set(&st->write_ev, fd, EV_WRITE | EV_PERSIST,
		  tcp_wr_evt, st);
}

void tcp_write_exit(struct tcp_write_state *st)
{
	if (st->writing)
		event_del(&st->write_ev);

	tcp_write_free_all(st);
}

====================================SNIP CUT HERE SNIP=========================

[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux