Rewrite bidirectional traffic loop to be clearer, fix some logic errors that could lead into push failing or unneeded CPU usage, and support debugging mode (activated by setting $GIT_TRANSLOOP_DEBUG). Signed-off-by: Ilari Liusvaara <ilari.liusvaara@xxxxxxxxxxx> --- transport-helper.c | 296 +++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 210 insertions(+), 86 deletions(-) diff --git a/transport-helper.c b/transport-helper.c index 3591e0d..0facf65 100644 --- a/transport-helper.c +++ b/transport-helper.c @@ -865,130 +865,254 @@ int transport_helper_init(struct transport *transport, const char *name) #define BUFFERSIZE 4096 +#define PBUFFERSIZE 8192 -/* Copy data from stdin to output and from input to stdout. */ -int bidirectional_transfer_loop(int input, int output) +/* Print bidirectional transfer loop debug message. */ +static void transfer_debug(const char *fmt, ...) { - struct pollfd polls[4]; - char in_buffer[BUFFERSIZE]; - char out_buffer[BUFFERSIZE]; - size_t in_buffer_use = 0; - size_t out_buffer_use = 0; - int in_hup = 0; - int out_hup = 0; - int socket_mode = 0; - int input_index = 2; - int output_index = 3; - int poll_count = 4; + va_list args; + char msgbuf[PBUFFERSIZE]; + static int debug_enabled = -1; + + if (debug_enabled < 0) + debug_enabled = getenv("GIT_TRANSLOOP_DEBUG") ? 1 : 0; + if (!debug_enabled) + return; + + sprintf(msgbuf, "Transfer loop debugging: "); + va_start(args, fmt); + vsprintf(msgbuf + strlen(msgbuf), fmt, args); + va_end(args); + fprintf(stderr, "%s\n", msgbuf); +} - if(input == output) { - output_index = input_index; - poll_count = 3; - socket_mode = 1; +/* Load the parameters into poll structure. Return number of entries loaded */ +static int load_poll_params(struct pollfd *polls, size_t inbufuse, + size_t outbufuse, int in_hup, int out_hup, int in_closed, + int out_closed, int socket_mode, int input_fd, int output_fd) +{ + int stdin_index = -1; + int stdout_index = -1; + int input_index = -1; + int output_index = -1; + int nextindex = 0; + int i; + + /* + * Inputs can't be waited at all if buffer is full since we can't + * do read on 0 bytes as it could do strange things. + */ + if (!in_hup && inbufuse < BUFFERSIZE) { + stdin_index = nextindex++; + polls[stdin_index].fd = 0; + transfer_debug("Adding stdin to fds to wait for"); + } + if (!out_hup && outbufuse < BUFFERSIZE) { + input_index = nextindex++; + polls[input_index].fd = input_fd; + transfer_debug("Adding remote input to fds to wait for"); + } + if (!out_closed && outbufuse > 0) { + stdout_index = nextindex++; + polls[stdout_index].fd = 1; + transfer_debug("Adding stdout to fds to wait for"); + } + if (!in_closed && inbufuse > 0) { + if (socket_mode && input_index >= 0) + output_index = input_index; + else { + output_index = nextindex++; + polls[output_index].fd = output_fd; + } + transfer_debug("Adding remote output to fds to wait for"); } - while(in_buffer_use || out_buffer_use || !in_hup || !out_hup) { - int r, i; - /* Set up the poll and do it. */ - polls[0].fd = 0; - polls[1].fd = 1; - polls[input_index].fd = input; - polls[output_index].fd = output; - for(i = 0; i < 4; i++) - polls[i].events = polls[i].revents = 0; - - if(in_buffer_use > 0) - polls[output_index].events |= POLLOUT; - if(in_buffer_use < BUFFERSIZE && !in_hup) - polls[0].events |= POLLIN; - if(out_buffer_use > 0) - polls[1].events |= POLLOUT; - if(out_buffer_use < BUFFERSIZE && !out_hup) - polls[input_index].events |= POLLIN; - r = poll(polls, poll_count, -1); - if(r < 0) { - if(errno == EWOULDBLOCK || errno == EAGAIN || - errno == EINTR) - continue; - perror("poll failed"); - return 1; - } else if(r == 0) - continue; + for (i = 0; i < nextindex; i++) + polls[i].events = polls[i].revents = 0; - /* Something interesting has happened... */ - if(polls[0].revents & (POLLIN | POLLHUP)) { - /* Stdin is readable. */ - r = read(0, in_buffer + in_buffer_use, BUFFERSIZE - - in_buffer_use); - if(r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + if (stdin_index >= 0) { + polls[stdin_index].events |= POLLIN; + transfer_debug("Waiting for stdin to become readable"); + } + if (input_index >= 0) { + polls[input_index].events |= POLLIN; + transfer_debug("Waiting for remote input to become readable"); + } + if (stdout_index >= 0) { + polls[stdout_index].events |= POLLOUT; + transfer_debug("Waiting for stdout to become writable"); + } + if (output_index >= 0) { + polls[output_index].events |= POLLOUT; + transfer_debug("Waiting for remote output to become writable"); + } + + /* Return number of indexes assigned. */ + return nextindex; +} + +static int transfer_handle_events(struct pollfd* polls, char *in_buffer, + char *out_buffer, size_t *in_buffer_use, size_t *out_buffer_use, + int *in_hup, int *out_hup, int *in_closed, int *out_closed, + int socket_mode, int poll_count, int input, int output) +{ + int i, r; + for(i = 0; i < poll_count; i++) { + /* Handle stdin. */ + if (polls[i].fd == 0 && polls[i].revents & (POLLIN | POLLHUP)) { + transfer_debug("stdin is readable"); + r = read(0, in_buffer + *in_buffer_use, BUFFERSIZE - + *in_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) { perror("read(git) failed"); return 1; - } else if(r == 0) { - in_hup = 1; - if(!in_buffer_use) { - if(socket_mode) + } else if (r == 0) { + transfer_debug("stdin EOF"); + *in_hup = 1; + if (!*in_buffer_use) { + if (socket_mode) shutdown(output, SHUT_WR); else close(output); - } - } else - in_buffer_use += r; + *in_closed = 1; + transfer_debug("Closed remote output"); + } else + transfer_debug("Delaying remote output close because input buffer has data"); + } else if (r > 0) { + *in_buffer_use += r; + transfer_debug("Read %i bytes from stdin (buffer now at %i)", r, (int)*in_buffer_use); + } } - if(polls[input_index].revents & (POLLIN | POLLHUP)) { - /* Connection is readable. */ - r = read(input, out_buffer + out_buffer_use, - BUFFERSIZE - out_buffer_use); - if(r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + /* Handle remote end input. */ + if (polls[i].fd == input && + polls[i].revents & (POLLIN | POLLHUP)) { + transfer_debug("remote input is readable"); + r = read(input, out_buffer + *out_buffer_use, + BUFFERSIZE - *out_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) { perror("read(connection) failed"); return 1; - } else if(r == 0) { - out_hup = 1; - if(!out_buffer_use) + } else if (r == 0) { + transfer_debug("remote input EOF"); + *out_hup = 1; + if (!*out_buffer_use) { close(1); - } else - out_buffer_use += r; + *out_closed = 1; + transfer_debug("Closed stdout"); + } else + transfer_debug("Delaying stdout close because output buffer has data"); + + } else if (r > 0) { + *out_buffer_use += r; + transfer_debug("Read %i bytes from remote input (buffer now at %i)", r, (int)*out_buffer_use); + } } - if(polls[1].revents & POLLOUT) { - /* Stdout is writable. */ - r = write(1, out_buffer, out_buffer_use); - if(r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + /* Handle stdout. */ + if (polls[i].fd == 1 && polls[i].revents & POLLNVAL) { + error("Write pipe to Git unexpectedly closed."); + return 1; + } + if (polls[i].fd == 1 && polls[i].revents & POLLOUT) { + transfer_debug("stdout is writable"); + r = write(1, out_buffer, *out_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) { perror("write(git) failed"); return 1; - } else { - out_buffer_use -= r; - if(out_buffer_use > 0) + } else if (r > 0){ + *out_buffer_use -= r; + transfer_debug("Wrote %i bytes to stdout (buffer now at %i)", r, (int)*out_buffer_use); + if (*out_buffer_use > 0) memmove(out_buffer, out_buffer + r, - out_buffer_use); - if(out_hup && !out_buffer_use) + *out_buffer_use); + if (*out_hup && !*out_buffer_use) { close(1); + *out_closed = 1; + transfer_debug("Closed stdout"); + } } } - if(polls[output_index].revents & POLLOUT) { - /* Connection is writable. */ - r = write(output, in_buffer, in_buffer_use); - if(r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && + /* Handle remote end output. */ + if (polls[i].fd == output && polls[i].revents & POLLNVAL) { + error("Write pipe to remote end unexpectedly closed."); + return 1; + } + if (polls[i].fd == output && polls[i].revents & POLLOUT) { + transfer_debug("remote output is writable"); + r = write(output, in_buffer, *in_buffer_use); + if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) { perror("write(connection) failed"); return 1; - } else { - in_buffer_use -= r; - if(in_buffer_use > 0) + } else if (r > 0) { + *in_buffer_use -= r; + transfer_debug("Wrote %i bytes to remote output (buffer now at %i)", r, (int)*in_buffer_use); + if (*in_buffer_use > 0) memmove(in_buffer, in_buffer + r, - in_buffer_use); - if(in_hup && !in_buffer_use) { - if(socket_mode) + *in_buffer_use); + if (*in_hup && !*in_buffer_use) { + if (socket_mode) shutdown(output, SHUT_WR); else close(output); + *in_closed = 1; + transfer_debug("Closed remote output"); } } } } return 0; } + +/* Copy data from stdin to output and from input to stdout. */ +int bidirectional_transfer_loop(int input, int output) +{ + struct pollfd polls[4]; + char in_buffer[BUFFERSIZE]; + char out_buffer[BUFFERSIZE]; + size_t in_buffer_use = 0; + size_t out_buffer_use = 0; + int in_hup = 0; + int out_hup = 0; + int in_closed = 0; + int out_closed = 0; + int socket_mode = 0; + int poll_count = 4; + + if (input == output) + socket_mode = 1; + + while (1) { + int r; + poll_count = load_poll_params(polls, in_buffer_use, + out_buffer_use, in_hup, out_hup, in_closed, out_closed, + socket_mode, input, output); + if (!poll_count) { + transfer_debug("Transfer done"); + break; + } + transfer_debug("Waiting for %i file descriptors", poll_count); + r = poll(polls, poll_count, -1); + if (r < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN || + errno == EINTR) + continue; + perror("poll failed"); + return 1; + } else if (r == 0) + continue; + + r = transfer_handle_events(polls, in_buffer, out_buffer, + &in_buffer_use, &out_buffer_use, &in_hup, &out_hup, + &in_closed, &out_closed, socket_mode, poll_count, + input, output); + if (r) + return r; + } + return 0; +} -- 1.7.2.1.9.g1ccab -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html