In blocking mode, the IO operations block indefinitely if the server closes the virtio port on it's side. Change to non-blocking mode, so that we can quit the streaming agent in case the port gets closed. Signed-off-by: Lukáš Hrázký <lhrazky@xxxxxxxxxx> --- src/error.hpp | 2 ++ src/spice-streaming-agent.cpp | 4 ++-- src/stream-port.cpp | 49 ++++++++++++++++++++++++++++++++++---- src/unittests/test-stream-port.cpp | 22 +++++++++++++---- 4 files changed, 66 insertions(+), 11 deletions(-) diff --git a/src/error.hpp b/src/error.hpp index 333a481..e30990f 100644 --- a/src/error.hpp +++ b/src/error.hpp @@ -24,6 +24,8 @@ public: class IOError : public Error { public: + using Error::Error; + IOError(const std::string &msg, int sys_errno); }; diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp index 26258d0..692f067 100644 --- a/src/spice-streaming-agent.cpp +++ b/src/spice-streaming-agent.cpp @@ -72,7 +72,7 @@ static int have_something_to_read(int timeout) return -1; } - if (pollfd.revents == POLLIN) { + if (pollfd.revents & POLLIN) { return 1; } @@ -330,7 +330,7 @@ static void cursor_changes(Display *display, int event_base) static void do_capture(const char *streamport, FILE *f_log) { - streamfd = open(streamport, O_RDWR); + streamfd = open(streamport, O_RDWR | O_NONBLOCK); if (streamfd < 0) { throw std::runtime_error("failed to open the streaming device (" + std::string(streamport) + "): " diff --git a/src/stream-port.cpp b/src/stream-port.cpp index cee63ac..72364bd 100644 --- a/src/stream-port.cpp +++ b/src/stream-port.cpp @@ -8,6 +8,7 @@ #include "error.hpp" #include <errno.h> +#include <poll.h> #include <string.h> #include <syslog.h> #include <unistd.h> @@ -22,9 +23,31 @@ void read_all(int fd, void *buf, size_t len) while (len > 0) { ssize_t n = read(fd, buf, len); + if (n == 0) { + throw ReadError("Reading message from device failed: read() returned 0, device is closed."); + } + if (n < 0) { - if (errno == EINTR) { - continue; + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + struct pollfd pollfd = {fd, POLLIN, 0}; + if (poll(&pollfd, 1, -1) < 0) { + if (errno == EINTR) { + continue; + } + + throw ReadError("poll failed while reading message from device", errno); + } + + if (pollfd.revents & POLLIN) { + continue; + } + + if (pollfd.revents & POLLHUP) { + throw ReadError("Reading message from device failed: The device is closed."); + } + + throw ReadError("Reading message from device failed: poll returned " + + std::to_string(pollfd.revents)); } throw ReadError("Reading message from device failed", errno); } @@ -40,8 +63,26 @@ void write_all(int fd, const void *buf, size_t len) ssize_t n = write(fd, buf, len); if (n < 0) { - if (errno == EINTR) { - continue; + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + struct pollfd pollfd = {fd, POLLOUT, 0}; + if (poll(&pollfd, 1, -1) < 0) { + if (errno == EINTR) { + continue; + } + + throw WriteError("poll failed while writing message to device", errno); + } + + if (pollfd.revents & POLLOUT) { + continue; + } + + if (pollfd.revents & POLLHUP) { + throw WriteError("Writing message to device failed: The device is closed."); + } + + throw WriteError("Writing message to device failed: poll returned " + + std::to_string(pollfd.revents)); } throw WriteError("Writing message to device failed", errno); } diff --git a/src/unittests/test-stream-port.cpp b/src/unittests/test-stream-port.cpp index 3f9dadf..eb02142 100644 --- a/src/unittests/test-stream-port.cpp +++ b/src/unittests/test-stream-port.cpp @@ -1,8 +1,10 @@ #define CATCH_CONFIG_MAIN #include <catch/catch.hpp> #include <sys/socket.h> +#include <signal.h> #include "stream-port.hpp" +#include "error.hpp" namespace ssa = spice::streaming_agent; @@ -12,12 +14,18 @@ namespace ssa = spice::streaming_agent; * that is actually used for the real interface. */ SCENARIO("test basic IO on the stream port", "[port][io]") { + // When trying to write to a socket that was closed on the other side, the + // process receives a SIGPIPE, which is a difference to the virtio port, + // which returns EAGAIN from write(). By ignoring the SIGPIPE we get EPIPE + // from write() instead. + signal(SIGPIPE, SIG_IGN); + GIVEN("An open port (socketpair)") { int fd[2]; const char *src_buf = "brekeke"; const size_t src_size = strlen(src_buf); - socketpair(AF_LOCAL, SOCK_STREAM, 0, fd); + socketpair(AF_LOCAL, SOCK_STREAM | SOCK_NONBLOCK, 0, fd); WHEN("reading data in one go") { CHECK(write(fd[0], src_buf, src_size) == src_size); @@ -50,16 +58,20 @@ SCENARIO("test basic IO on the stream port", "[port][io]") { CHECK(close(fd[0]) == 0); ssa::read_all(fd[1], buf, 4); CHECK(std::string(buf, 4) == "keke"); - // TODO blocks infinitely, we should recognize the remote end is closed - //ssa::read_all(fd[1], buf, 1); + CHECK_THROWS_AS(ssa::read_all(fd[1], buf, 1), ssa::ReadError); } + // This test behaves differently with socketpair than it does with the virtio port: + // real case: + // - write() on the virtio port returns EAGAIN + // - subsequent poll() on the port returns POLLHUP, which throws WriteError + // test case: + // - write() on the socketpair returns EPIPE, which throws WriteError WHEN("closing the remote end and trying to write") { ssa::write_all(fd[1], src_buf, src_size); char buf[10]; CHECK(close(fd[0]) == 0); - // TODO causes a SIGPIPE - //ssa::write_all(fd[1], src_buf, src_size); + CHECK_THROWS_AS(ssa::write_all(fd[1], src_buf, src_size), ssa::WriteError); } // clean up the descriptors in case they are still open -- 2.16.2 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel