From: Christophe de Dinechin <dinechin@xxxxxxxxxx> The 'Stream' class is designed to abstract file I/O. In a subsequent patch, message formatting will be isolated out of the class, but in order to minimize code changes, this intermediate step simply moves the corresponding functions within the Stream class. Signed-off-by: Christophe de Dinechin <dinechin@xxxxxxxxxx> --- src/spice-streaming-agent.cpp | 108 +++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 50 deletions(-) diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp index 21f9c31..4d24234 100644 --- a/src/spice-streaming-agent.cpp +++ b/src/spice-streaming-agent.cpp @@ -40,8 +40,6 @@ using namespace spice::streaming_agent; -static size_t write_all(int fd, const void *buf, const size_t len); - static ConcreteAgent agent; namespace spice @@ -72,31 +70,44 @@ class Stream public: Stream(const char *name) { - fd = open(name, O_RDWR); - if (fd < 0) { + streamfd = open(name, O_RDWR); + if (streamfd < 0) { throw std::runtime_error("failed to open streaming device"); } } ~Stream() { - close(fd); + close(streamfd); } - int file_descriptor() { return fd; } + + int read_command(bool blocking); + size_t write_all(const void *buf, const size_t len); + int send_format(unsigned w, unsigned h, uint8_t c); + int send_frame(const void *buf, const unsigned size); + void send_cursor(uint16_t width, uint16_t height, + uint16_t hotspot_x, uint16_t hotspot_y, + std::function<void(uint32_t *)> fill_cursor); + +private: + int have_something_to_read(int timeout); + void handle_stream_start_stop(uint32_t len); + void handle_stream_capabilities(uint32_t len); + void handle_stream_error(uint32_t len); + void read_command_from_device(void); private: - int fd = -1; + int streamfd = -1; + std::mutex mutex; }; }} // namespace spice::streaming_agent - static bool streaming_requested = false; static bool quit_requested = false; static bool log_binary = false; static std::set<SpiceVideoCodecType> client_codecs; -static std::mutex stream_mtx; -static int have_something_to_read(int streamfd, int timeout) +int Stream::have_something_to_read(int timeout) { struct pollfd pollfd = {streamfd, POLLIN, 0}; @@ -112,7 +123,7 @@ static int have_something_to_read(int streamfd, int timeout) return 0; } -static void handle_stream_start_stop(int streamfd, uint32_t len) +void Stream::handle_stream_start_stop(uint32_t len) { uint8_t msg[256]; @@ -134,7 +145,7 @@ static void handle_stream_start_stop(int streamfd, uint32_t len) } } -static void handle_stream_capabilities(int streamfd, uint32_t len) +void Stream::handle_stream_capabilities(uint32_t len) { uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; @@ -154,23 +165,23 @@ static void handle_stream_capabilities(int streamfd, uint32_t len) STREAM_TYPE_CAPABILITIES, 0 }; - if (write_all(streamfd, &hdr, sizeof(hdr)) != sizeof(hdr)) { + if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) { throw std::runtime_error("error writing capabilities"); } } -static void handle_stream_error(int streamfd, uint32_t len) +void Stream::handle_stream_error(uint32_t len) { // TODO read message and use it throw std::runtime_error("got an error message from server"); } -static void read_command_from_device(int streamfd) +void Stream::read_command_from_device() { StreamDevHeader hdr; int n; - std::lock_guard<std::mutex> stream_guard(stream_mtx); + std::lock_guard<std::mutex> stream_guard(mutex); n = read(streamfd, &hdr, sizeof(hdr)); if (n != sizeof(hdr)) { throw std::runtime_error("read command from device FAILED -- read " + std::to_string(n) + @@ -183,39 +194,38 @@ static void read_command_from_device(int streamfd) switch (hdr.type) { case STREAM_TYPE_CAPABILITIES: - return handle_stream_capabilities(streamfd, hdr.size); + return handle_stream_capabilities(hdr.size); case STREAM_TYPE_NOTIFY_ERROR: - return handle_stream_error(streamfd, hdr.size); + return handle_stream_error(hdr.size); case STREAM_TYPE_START_STOP: - return handle_stream_start_stop(streamfd, hdr.size); + return handle_stream_start_stop(hdr.size); } throw std::runtime_error("UNKNOWN msg of type " + std::to_string(hdr.type)); } -static int read_command(int streamfd, bool blocking) +int Stream::read_command(bool blocking) { int timeout = blocking?-1:0; while (!quit_requested) { - if (!have_something_to_read(streamfd, timeout)) { + if (!have_something_to_read(timeout)) { if (!blocking) { return 0; } sleep(1); continue; } - read_command_from_device(streamfd); + read_command_from_device(); break; } return 1; } -static size_t -write_all(int fd, const void *buf, const size_t len) +size_t Stream::write_all(const void *buf, const size_t len) { size_t written = 0; while (written < len) { - int l = write(fd, (const char *) buf + written, len - written); + int l = write(streamfd, (const char *) buf + written, len - written); if (l < 0) { if (errno == EINTR) { continue; @@ -229,7 +239,7 @@ write_all(int fd, const void *buf, const size_t len) return written; } -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c) +int Stream::send_format(unsigned w, unsigned h, uint8_t c) { const size_t msgsize = sizeof(FormatMessage); const size_t hdrsize = sizeof(StreamDevHeader); @@ -248,14 +258,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_ } }; syslog(LOG_DEBUG, "writing format\n"); - std::lock_guard<std::mutex> stream_guard(stream_mtx); - if (write_all(streamfd, &msg, msgsize) != msgsize) { + std::lock_guard<std::mutex> stream_guard(mutex); + if (write_all(&msg, msgsize) != msgsize) { return EXIT_FAILURE; } return EXIT_SUCCESS; } -static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size) +int Stream::send_frame(const void *buf, const unsigned size) { ssize_t n; const size_t msgsize = sizeof(FormatMessage); @@ -269,8 +279,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned .msg = {} }; - std::lock_guard<std::mutex> stream_guard(stream_mtx); - n = write_all(streamfd, &msg, msgsize); + std::lock_guard<std::mutex> stream_guard(mutex); + n = write_all(&msg, msgsize); syslog(LOG_DEBUG, "wrote %ld bytes of header of data msg with frame of size %u bytes\n", n, msg.hdr.size); @@ -279,7 +289,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned n, msgsize); return EXIT_FAILURE; } - n = write_all(streamfd, buf, size); + n = write_all(buf, size); syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n); if (n != size) { syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n", @@ -333,11 +343,10 @@ static void usage(const char *progname) exit(1); } -static void -send_cursor(int streamfd, - uint16_t width, uint16_t height, - uint16_t hotspot_x, uint16_t hotspot_y, - std::function<void(uint32_t *)> fill_cursor) +void +Stream::send_cursor(uint16_t width, uint16_t height, + uint16_t hotspot_x, uint16_t hotspot_y, + std::function<void(uint32_t *)> fill_cursor) { if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) { return; @@ -370,11 +379,11 @@ send_cursor(int streamfd, uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data); fill_cursor(pixels); - std::lock_guard<std::mutex> stream_guard(stream_mtx); - write_all(streamfd, storage.get(), msgsize); + std::lock_guard<std::mutex> stream_guard(mutex); + write_all(storage.get(), msgsize); } -static void cursor_changes(int streamfd, Display *display, int event_base) +static void cursor_changes(Stream *stream, Display *display, int event_base) { unsigned long last_serial = 0; @@ -399,18 +408,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base) for (unsigned i = 0; i < cursor->width * cursor->height; ++i) pixels[i] = cursor->pixels[i]; }; - send_cursor(streamfd, - cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor); + stream->send_cursor(cursor->width, cursor->height, + cursor->xhot, cursor->yhot, fill_cursor); } } static void -do_capture(int streamfd, const char *streamport, FILE *f_log) +do_capture(Stream &stream, const char *streamport, FILE *f_log) { unsigned int frame_count = 0; while (!quit_requested) { while (!quit_requested && !streaming_requested) { - if (read_command(streamfd, true) < 0) { + if (stream.read_command(true) < 0) { syslog(LOG_ERR, "FAILED to read command\n"); return; } @@ -455,7 +464,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log) syslog(LOG_DEBUG, "wXh %uX%u codec=%u\n", width, height, codec); - if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE) { + if (stream.send_format(width, height, codec) == EXIT_FAILURE) { throw std::runtime_error("FAILED to send format message"); } } @@ -468,13 +477,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log) hexdump(frame.buffer, frame.buffer_size, f_log); } } - if (spice_stream_send_frame(streamfd, - frame.buffer, frame.buffer_size) == EXIT_FAILURE) { + if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) { syslog(LOG_ERR, "FAILED to send a frame\n"); break; } //usleep(1); - if (read_command(streamfd, false) < 0) { + if (stream.read_command(false) < 0) { syslog(LOG_ERR, "FAILED to read command\n"); return; } @@ -576,12 +584,12 @@ int main(int argc, char* argv[]) XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask); Stream stream(streamport); - std::thread cursor_th(cursor_changes, stream.file_descriptor(), display, event_base); + std::thread cursor_th(cursor_changes, &stream, display, event_base); cursor_th.detach(); int ret = EXIT_SUCCESS; try { - do_capture(stream.file_descriptor(), streamport, f_log); + do_capture(stream, streamport, f_log); } catch (std::runtime_error &err) { syslog(LOG_ERR, "%s\n", err.what()); -- 2.13.5 (Apple Git-94) _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel