This helper function copies bidirectional stream of data between stdin/stdout and specified file descriptors. Signed-off-by: Ilari Liusvaara <ilari.liusvaara@xxxxxxxxxxx> --- compat/mingw.h | 5 + transport-helper.c | 254 ++++++++++++++++++++++++++++++++++++++++++++++++++++ transport.h | 1 + 3 files changed, 260 insertions(+), 0 deletions(-) diff --git a/compat/mingw.h b/compat/mingw.h index 3b2477b..f27a7b6 100644 --- a/compat/mingw.h +++ b/compat/mingw.h @@ -23,6 +23,9 @@ typedef int pid_t; #define WEXITSTATUS(x) ((x) & 0xff) #define WTERMSIG(x) SIGTERM +#define EWOULDBLOCK EAGAIN +#define SHUT_WR SD_SEND + #define SIGHUP 1 #define SIGQUIT 3 #define SIGKILL 9 @@ -50,6 +53,8 @@ struct pollfd { }; #define POLLIN 1 #define POLLHUP 2 +#define POLLOUT 4 +#define POLLNVAL 8 #endif typedef void (__cdecl *sig_handler_t)(int); diff --git a/transport-helper.c b/transport-helper.c index acfc88e..1ebcebc 100644 --- a/transport-helper.c +++ b/transport-helper.c @@ -862,3 +862,257 @@ int transport_helper_init(struct transport *transport, const char *name) transport->smart_options = &(data->transport_options); return 0; } + + +#define BUFFERSIZE 4096 +#define PBUFFERSIZE 8192 + +/* Print bidirectional transfer loop debug message. */ +static void transfer_debug(const char *fmt, ...) +{ + va_list args; + char msgbuf[PBUFFERSIZE]; + static int debug_enabled = -1; + + if (debug_enabled < 0) + debug_enabled = getenv("GIT_TRANSLOOP_DEBUG") ? 1 : 0; + if (!debug_enabled) + return; + + sprintf(msgbuf, "Transfer loop debugging: "); + va_start(args, fmt); + vsprintf(msgbuf + strlen(msgbuf), fmt, args); + va_end(args); + fprintf(stderr, "%s\n", msgbuf); +} + +/* Load the parameters into poll structure. Return number of entries loaded */ +static int load_poll_params(struct pollfd *polls, size_t inbufuse, + size_t outbufuse, int in_hup, int out_hup, int in_closed, + int out_closed, int socket_mode, int input_fd, int output_fd) +{ + int stdin_index = -1; + int stdout_index = -1; + int input_index = -1; + int output_index = -1; + int nextindex = 0; + int i; + + /* + * Inputs can't be waited at all if buffer is full since we can't + * do read on 0 bytes as it could do strange things. + */ + if (!in_hup && inbufuse < BUFFERSIZE) { + stdin_index = nextindex++; + polls[stdin_index].fd = 0; + transfer_debug("Adding stdin to fds to wait for"); + } + if (!out_hup && outbufuse < BUFFERSIZE) { + input_index = nextindex++; + polls[input_index].fd = input_fd; + transfer_debug("Adding remote input to fds to wait for"); + } + if (!out_closed && outbufuse > 0) { + stdout_index = nextindex++; + polls[stdout_index].fd = 1; + transfer_debug("Adding stdout to fds to wait for"); + } + if (!in_closed && inbufuse > 0) { + if (socket_mode && input_index >= 0) + output_index = input_index; + else { + output_index = nextindex++; + polls[output_index].fd = output_fd; + } + transfer_debug("Adding remote output to fds to wait for"); + } + + for (i = 0; i < nextindex; i++) + polls[i].events = polls[i].revents = 0; + + if (stdin_index >= 0) { + polls[stdin_index].events |= POLLIN; + transfer_debug("Waiting for stdin to become readable"); + } + if (input_index >= 0) { + polls[input_index].events |= POLLIN; + transfer_debug("Waiting for remote input to become readable"); + } + if (stdout_index >= 0) { + polls[stdout_index].events |= POLLOUT; + transfer_debug("Waiting for stdout to become writable"); + } + if (output_index >= 0) { + polls[output_index].events |= POLLOUT; + transfer_debug("Waiting for remote output to become writable"); + } + + /* Return number of indexes assigned. */ + return nextindex; +} + +static int transfer_handle_events(struct pollfd* polls, char *in_buffer, + char *out_buffer, size_t *in_buffer_use, size_t *out_buffer_use, + int *in_hup, int *out_hup, int *in_closed, int *out_closed, + int socket_mode, int poll_count, int input, int output) +{ + int i, r; + for(i = 0; i < poll_count; i++) { + /* Handle stdin. */ + if (polls[i].fd == 0 && polls[i].revents & (POLLIN | POLLHUP)) { + transfer_debug("stdin is readable"); + r = read(0, in_buffer + *in_buffer_use, BUFFERSIZE - + *in_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + errno != EINTR) { + perror("read(git) failed"); + return 1; + } else if (r == 0) { + transfer_debug("stdin EOF"); + *in_hup = 1; + if (!*in_buffer_use) { + if (socket_mode) + shutdown(output, SHUT_WR); + else + close(output); + *in_closed = 1; + transfer_debug("Closed remote output"); + } else + transfer_debug("Delaying remote output close because input buffer has data"); + } else if (r > 0) { + *in_buffer_use += r; + transfer_debug("Read %i bytes from stdin (buffer now at %i)", r, (int)*in_buffer_use); + } + } + + /* Handle remote end input. */ + if (polls[i].fd == input && + polls[i].revents & (POLLIN | POLLHUP)) { + transfer_debug("remote input is readable"); + r = read(input, out_buffer + *out_buffer_use, + BUFFERSIZE - *out_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + errno != EINTR) { + perror("read(connection) failed"); + return 1; + } else if (r == 0) { + transfer_debug("remote input EOF"); + *out_hup = 1; + if (!*out_buffer_use) { + close(1); + *out_closed = 1; + transfer_debug("Closed stdout"); + } else + transfer_debug("Delaying stdout close because output buffer has data"); + + } else if (r > 0) { + *out_buffer_use += r; + transfer_debug("Read %i bytes from remote input (buffer now at %i)", r, (int)*out_buffer_use); + } + } + + /* Handle stdout. */ + if (polls[i].fd == 1 && polls[i].revents & POLLNVAL) { + error("Write pipe to Git unexpectedly closed."); + return 1; + } + if (polls[i].fd == 1 && polls[i].revents & POLLOUT) { + transfer_debug("stdout is writable"); + r = write(1, out_buffer, *out_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + errno != EINTR) { + perror("write(git) failed"); + return 1; + } else if (r > 0){ + *out_buffer_use -= r; + transfer_debug("Wrote %i bytes to stdout (buffer now at %i)", r, (int)*out_buffer_use); + if (*out_buffer_use > 0) + memmove(out_buffer, out_buffer + r, + *out_buffer_use); + if (*out_hup && !*out_buffer_use) { + close(1); + *out_closed = 1; + transfer_debug("Closed stdout"); + } + } + } + + /* Handle remote end output. */ + if (polls[i].fd == output && polls[i].revents & POLLNVAL) { + error("Write pipe to remote end unexpectedly closed."); + return 1; + } + if (polls[i].fd == output && polls[i].revents & POLLOUT) { + transfer_debug("remote output is writable"); + r = write(output, in_buffer, *in_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + errno != EINTR) { + perror("write(connection) failed"); + return 1; + } else if (r > 0) { + *in_buffer_use -= r; + transfer_debug("Wrote %i bytes to remote output (buffer now at %i)", r, (int)*in_buffer_use); + if (*in_buffer_use > 0) + memmove(in_buffer, in_buffer + r, + *in_buffer_use); + if (*in_hup && !*in_buffer_use) { + if (socket_mode) + shutdown(output, SHUT_WR); + else + close(output); + *in_closed = 1; + transfer_debug("Closed remote output"); + } + } + } + } + return 0; +} + +/* Copy data from stdin to output and from input to stdout. */ +int bidirectional_transfer_loop(int input, int output) +{ + struct pollfd polls[4]; + char in_buffer[BUFFERSIZE]; + char out_buffer[BUFFERSIZE]; + size_t in_buffer_use = 0; + size_t out_buffer_use = 0; + int in_hup = 0; + int out_hup = 0; + int in_closed = 0; + int out_closed = 0; + int socket_mode = 0; + int poll_count = 4; + + if (input == output) + socket_mode = 1; + + while (1) { + int r; + poll_count = load_poll_params(polls, in_buffer_use, + out_buffer_use, in_hup, out_hup, in_closed, out_closed, + socket_mode, input, output); + if (!poll_count) { + transfer_debug("Transfer done"); + break; + } + transfer_debug("Waiting for %i file descriptors", poll_count); + r = poll(polls, poll_count, -1); + if (r < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN || + errno == EINTR) + continue; + perror("poll failed"); + return 1; + } else if (r == 0) + continue; + + r = transfer_handle_events(polls, in_buffer, out_buffer, + &in_buffer_use, &out_buffer_use, &in_hup, &out_hup, + &in_closed, &out_closed, socket_mode, poll_count, + input, output); + if (r) + return r; + } + return 0; +} diff --git a/transport.h b/transport.h index c59d973..e803c0e 100644 --- a/transport.h +++ b/transport.h @@ -154,6 +154,7 @@ int transport_connect(struct transport *transport, const char *name, /* Transport methods defined outside transport.c */ int transport_helper_init(struct transport *transport, const char *name); +int bidirectional_transfer_loop(int input, int output); /* common methods used by transport.c and builtin-send-pack.c */ void transport_verify_remote_names(int nr_heads, const char **heads); -- 1.7.2.3.401.g919b6e -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html