[PATCH v4 1/3] Add bidirectional_transfer_loop()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This helper function copies bidirectional stream of data between
stdin/stdout and specified file descriptors.

Signed-off-by: Ilari Liusvaara <ilari.liusvaara@xxxxxxxxxxx>
---
 compat/mingw.h     |    5 +
 transport-helper.c |  324 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 transport.h        |    1 +
 3 files changed, 330 insertions(+), 0 deletions(-)

diff --git a/compat/mingw.h b/compat/mingw.h
index 3b2477b..f27a7b6 100644
--- a/compat/mingw.h
+++ b/compat/mingw.h
@@ -23,6 +23,9 @@ typedef int pid_t;
 #define WEXITSTATUS(x) ((x) & 0xff)
 #define WTERMSIG(x) SIGTERM
 
+#define EWOULDBLOCK EAGAIN
+#define SHUT_WR SD_SEND
+
 #define SIGHUP 1
 #define SIGQUIT 3
 #define SIGKILL 9
@@ -50,6 +53,8 @@ struct pollfd {
 };
 #define POLLIN 1
 #define POLLHUP 2
+#define POLLOUT 4
+#define POLLNVAL 8
 #endif
 
 typedef void (__cdecl *sig_handler_t)(int);
diff --git a/transport-helper.c b/transport-helper.c
index acfc88e..1f7bad6 100644
--- a/transport-helper.c
+++ b/transport-helper.c
@@ -862,3 +862,327 @@ int transport_helper_init(struct transport *transport, const char *name)
 	transport->smart_options = &(data->transport_options);
 	return 0;
 }
+
+/*
+ * Linux pipes can buffer 65536 bytes at once (and most platforms can
+ * buffer less), so attempt reads and writes with up to that size.
+ */
+#define BUFFERSIZE 65536
+/* This should be enough to hold debugging message. */
+#define PBUFFERSIZE 8192
+
+/* Print bidirectional transfer loop debug message. */
+static void transfer_debug(const char *fmt, ...)
+{
+	va_list args;
+	char msgbuf[PBUFFERSIZE];
+	static int debug_enabled = -1;
+
+	if (debug_enabled < 0)
+		debug_enabled = getenv("GIT_TRANSLOOP_DEBUG") ? 1 : 0;
+	if (!debug_enabled)
+		return;
+
+	va_start(args, fmt);
+	vsnprintf(msgbuf, PBUFFERSIZE, fmt, args);
+	va_end(args);
+	fprintf(stderr, "Transfer loop debugging: %s\n", msgbuf);
+}
+
+/* Stream state: More data may be coming in this direction. */
+#define SSTATE_TRANSFERING 0
+/*
+ * Stream state: No more data coming in this direction, flushing rest of
+ * data.
+ */
+#define SSTATE_FLUSHING 1
+/* Stream state: Transfer in this direction finished. */
+#define SSTATE_FINISHED 2
+
+#define STATE_NEEDS_READING(state) ((state) <= SSTATE_TRANSFERING)
+#define STATE_NEEDS_WRITING(state) ((state) <= SSTATE_FLUSHING)
+#define STATE_NEEDS_CLOSING(state) ((state) == SSTATE_FLUSHING)
+
+/* Unidirectional transfer. */
+struct unidirectional_transfer
+{
+	/* Source */
+	int src;
+	/* Destination */
+	int dest;
+	/* Is destination socket? */
+	int dest_is_sock;
+	/* Transfer state (TRANSFERING/FLUSHING/FINISHED) */
+	int state;
+	/* Buffer. */
+	char buf[BUFFERSIZE];
+	/* Buffer used. */
+	size_t bufuse;
+	/* Name of source. */
+	const char *src_name;
+	/* Name of destination. */
+	const char *dest_name;
+};
+
+static int udt_can_read(struct unidirectional_transfer *t)
+{
+	return (STATE_NEEDS_READING(t->state) && t->bufuse < BUFFERSIZE);
+}
+
+static int udt_can_write(struct unidirectional_transfer *t)
+{
+	return (STATE_NEEDS_WRITING(t->state) && t->bufuse > 0);
+}
+
+static void udt_close_if_finished(struct unidirectional_transfer *t)
+{
+	if (STATE_NEEDS_CLOSING(t->state) && !t->bufuse) {
+		t->state = SSTATE_FINISHED;
+		if (t->dest_is_sock)
+			shutdown(t->dest, SHUT_WR);
+		else
+			close(t->dest);
+		transfer_debug("Closed %s.", t->dest_name);
+	}
+}
+
+static int udt_do_read(struct unidirectional_transfer *t)
+{
+	int r;
+	transfer_debug("%s is readable", t->src_name);
+	r = read(t->src, t->buf + t->bufuse, BUFFERSIZE - t->bufuse);
+	if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN &&
+		errno != EINTR) {
+		error("read(%s) failed: %s", t->src_name, strerror(errno));
+		return -1;
+	} else if (r == 0) {
+		transfer_debug("%s EOF (with %i bytes in buffer)",
+			t->src_name, t->bufuse);
+		t->state = SSTATE_FLUSHING;
+	} else if (r > 0) {
+		t->bufuse += r;
+		transfer_debug("Read %i bytes from %s (buffer now at %i)",
+			r, t->src_name, (int)t->bufuse);
+	}
+	return 0;
+}
+
+static int udt_do_write(struct unidirectional_transfer *t)
+{
+	int r;
+	transfer_debug("%s is writable", t->dest_name);
+	r = write(t->dest, t->buf, t->bufuse);
+	if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN &&
+		errno != EINTR) {
+		error("write(%s) failed: %s", t->dest_name, strerror(errno));
+		return -1;
+	} else if (r > 0) {
+		t->bufuse -= r;
+		if(t->bufuse)
+			memmove(t->buf, t->buf + r, t->bufuse);
+		transfer_debug("Wrote %i bytes to %s (buffer now at %i)",
+			r, t->dest_name, (int)t->bufuse);
+	}
+	return 0;
+}
+
+/* State of bidirectional transfer loop. */
+struct bidirectional_transfer_state
+{
+	/*
+	 * Current polls. Up to 4 because both read and write for both
+	 * directions possibly needs to poll, and all of these may occur
+	 * at once.
+	 */
+	struct pollfd polls[4];
+	/* Number of polls active. */
+	int polls_active;
+	/* Index for stdin in poll array, -1 if none. */
+	int stdin_index;
+	/* Index for stdout in poll array, -1 if none. */
+	int stdout_index;
+	/* Index for input descriptor in poll array, -1 if none. */
+	int input_index;
+	/* Index for output descrptor in poll array, -1 if none. */
+	int output_index;
+	/* Direction from program to git. */
+	struct unidirectional_transfer ptg;
+	/* Direction from git to program. */
+	struct unidirectional_transfer gtp;
+};
+
+/*
+ * Allocate individual poll array entry.
+ * polls -> The polls array. Must have at least *active + 1 entries.
+ * active -> Place for number of active polls. Value in this location is
+ * incremented.
+ * index -> Place to store index for newly allocated entry.
+ * fd -> The file descriptor to place there.
+ * name -> Name for fd to print. If NULL, no message is printed.
+ */
+static void allocate_poll_array_entry(struct pollfd* polls, int* active,
+	int* index, int fd, const char* name)
+{
+	*index = (*active)++;
+	polls[*index].fd = fd;
+	if (name)
+		transfer_debug("Adding %s to fds to wait for...", name);
+}
+
+
+/* Allocate indexes for file descriptors in poll array. */
+static void allocate_poll_indexes(struct bidirectional_transfer_state* s)
+{
+	/* Initialize fields like there was nothing waiting. */
+	s->stdin_index = -1;
+	s->stdout_index = -1;
+	s->input_index = -1;
+	s->output_index = -1;
+	s->polls_active = 0;
+
+	if (udt_can_read(&s->ptg))
+		allocate_poll_array_entry(s->polls, &s->polls_active,
+			&s->input_index, s->ptg.src, s->ptg.src_name);
+	if (udt_can_write(&s->ptg))
+		allocate_poll_array_entry(s->polls, &s->polls_active,
+			&s->stdout_index, s->ptg.dest, s->ptg.dest_name);
+	if (udt_can_read(&s->gtp))
+		allocate_poll_array_entry(s->polls, &s->polls_active,
+			&s->stdin_index, s->gtp.src, s->gtp.src_name);
+	if (udt_can_write(&s->gtp)) {
+		if (s->gtp.dest == s->ptg.src && s->input_index >= 0)
+			s->output_index = s->input_index;
+		else
+			allocate_poll_array_entry(s->polls, &s->polls_active,
+				&s->output_index, s->gtp.dest,
+				s->gtp.dest_name);
+		transfer_debug("Adding %s to fds to wait for",
+			s->gtp.dest_name);
+	}
+}
+
+/*
+ * Set specified event flags for specified poll array entry if index is
+ * valid. If msg is not null, it is printed as debug message.
+ */
+static void mark_for_wait(struct pollfd* polls, int index, int flags,
+	const char* msg)
+{
+	if (index < 0)
+		return;		/* Index not in use. */
+	polls[index].events |= flags;
+	transfer_debug("Setting fd wait flags: fd=%i, flags=%i, index=%i",
+		polls[index].fd, flags, index);
+	if (msg)
+		transfer_debug("%s", msg);
+}
+
+/*
+ * Load the parameters into poll array and related fields based on rest of
+ * fields.
+ */
+static void load_poll_params(struct bidirectional_transfer_state* s)
+{
+	int i;
+
+	allocate_poll_indexes(s);
+	for (i = 0; i < s->polls_active; i++)
+		s->polls[i].events = s->polls[i].revents = 0;
+
+	mark_for_wait(s->polls, s->stdin_index, POLLIN,
+		"Waiting for stdin to become readable");
+	mark_for_wait(s->polls, s->input_index, POLLIN,
+		"Waiting for remote input to become readable");
+	mark_for_wait(s->polls, s->stdout_index, POLLOUT,
+		"Waiting for stdout to become writable");
+	mark_for_wait(s->polls, s->output_index, POLLOUT,
+		"Waiting for remote output to become writable");
+}
+
+/* Call handler if ready. */
+static int call_handler_if(int r, struct bidirectional_transfer_state* s,
+	int index, int flagmask, const char* name,
+	int (*on_ready)(struct unidirectional_transfer *t),
+	struct unidirectional_transfer *transfer)
+{
+	if(r)
+		return r;
+	if(index < 0)
+		return 0;		/* This is not being waited. */
+	if(s->polls[index].revents & POLLNVAL) {
+		error("%s got unexpectedly closed", name);
+		return -1;
+	}
+	if(!(s->polls[index].revents & flagmask))
+		return 0;		/* No events returned. */
+	return on_ready(transfer);
+}
+
+/* Handle events occured during poll. Returns -1 on error, 0 on success. */
+static int transfer_handle_events(struct bidirectional_transfer_state* s)
+{
+	int r = 0;
+	r = call_handler_if(r, s, s->stdin_index, POLLIN | POLLHUP,
+		s->gtp.src_name, udt_do_read, &s->gtp);
+	r = call_handler_if(r, s, s->input_index, POLLIN | POLLHUP,
+		s->ptg.src_name, udt_do_read, &s->ptg);
+	r = call_handler_if(r, s, s->output_index, POLLOUT,
+		s->gtp.dest_name, udt_do_write, &s->gtp);
+	r = call_handler_if(r, s, s->stdout_index, POLLOUT,
+		s->ptg.dest_name, udt_do_write, &s->ptg);
+	udt_close_if_finished(&s->ptg);
+	udt_close_if_finished(&s->gtp);
+	return r;
+}
+
+/*
+ * Copies data from stdin to output and from input to stdout simultaneously.
+ * Additionally filtering through given filter. If filter is NULL, uses
+ * identity filter.
+ */
+int bidirectional_transfer_loop(int input, int output)
+{
+	struct bidirectional_transfer_state state;
+
+	/* Fill the state fields. */
+	state.ptg.src = input;
+	state.ptg.dest = 1;
+	state.ptg.dest_is_sock = 0;
+	state.ptg.state = SSTATE_TRANSFERING;
+	state.ptg.bufuse = 0;
+	state.ptg.src_name = "remote input";
+	state.ptg.dest_name = "stdout";
+
+	state.gtp.src = 0;
+	state.gtp.dest = output;
+	state.gtp.dest_is_sock = (input == output);
+	state.gtp.state = SSTATE_TRANSFERING;
+	state.gtp.bufuse = 0;
+	state.gtp.src_name = "stdin";
+	state.gtp.dest_name = "remote output";
+
+	while (1) {
+		int r;
+		load_poll_params(&state);
+		if (!state.polls_active) {
+			transfer_debug("Transfer done");
+			break;
+		}
+		transfer_debug("Waiting for %i file descriptors",
+			state.polls_active);
+		r = poll(state.polls, state.polls_active, -1);
+		if (r < 0) {
+			if (errno == EWOULDBLOCK || errno == EAGAIN ||
+				errno == EINTR)
+				continue;
+			error("poll failed: %s", strerror(errno));
+			return -1;
+		} else if (r == 0)
+			continue;
+
+		r = transfer_handle_events(&state);
+		if (r)
+			return r;
+	}
+	return 0;
+}
diff --git a/transport.h b/transport.h
index c59d973..e803c0e 100644
--- a/transport.h
+++ b/transport.h
@@ -154,6 +154,7 @@ int transport_connect(struct transport *transport, const char *name,
 
 /* Transport methods defined outside transport.c */
 int transport_helper_init(struct transport *transport, const char *name);
+int bidirectional_transfer_loop(int input, int output);
 
 /* common methods used by transport.c and builtin-send-pack.c */
 void transport_verify_remote_names(int nr_heads, const char **heads);
-- 
1.7.3.1.48.g63ac7.dirty

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]