From: Christophe de Dinechin <dinechin@xxxxxxxxxx> - The Stream class now deals with locking and sending messages - The Message<> template class deals with the general writing mechanisms - Classes, FormatMessage, FrameMessage, CapabilitiesMessage and X11CursorMessage represent individual messages The various classes should be moved to separate headers in a subsequent operation The design uses the "curiously recurring template pattern" (CRTP) to defer some of the code to a derived class. This is done to avoid runtime overhead: all the calls are static. Signed-off-by: Christophe de Dinechin <dinechin@xxxxxxxxxx> --- src/spice-streaming-agent.cpp | 279 +++++++++++++++++++----------------------- 1 file changed, 128 insertions(+), 151 deletions(-) diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp index 23ee824..8bbd457 100644 --- a/src/spice-streaming-agent.cpp +++ b/src/spice-streaming-agent.cpp @@ -48,24 +48,6 @@ namespace spice namespace streaming_agent { -struct FormatMessage -{ - StreamDevHeader hdr; - StreamMsgFormat msg; -}; - -struct DataMessage -{ - StreamDevHeader hdr; - StreamMsgData msg; -}; - -struct CursorMessage -{ - StreamDevHeader hdr; - StreamMsgCursorSet msg; -}; - class Stream { typedef std::set<SpiceVideoCodecType> codecs_t; @@ -87,13 +69,17 @@ public: const codecs_t &client_codecs() { return codecs; } bool streaming_requested() { return is_streaming; } + template <typename Message, typename ...PayloadArgs> + void send(PayloadArgs... payload) + { + Message message(payload...); + std::lock_guard<std::mutex> stream_guard(mutex); + message.write_header(*this); + message.write_message_body(*this, payload...); + } + int read_command(bool blocking); - size_t write_all(const void *buf, const size_t len); - int send_format(unsigned w, unsigned h, uint8_t c); - int send_frame(const void *buf, const unsigned size); - void send_cursor(uint16_t width, uint16_t height, - uint16_t hotspot_x, uint16_t hotspot_y, - std::function<void(uint32_t *)> fill_cursor); + void write_all(const char *operation, const void *buf, const size_t len); private: int have_something_to_read(int timeout); @@ -109,6 +95,117 @@ private: bool is_streaming = false; }; +template <typename Payload, typename Info, unsigned Type> +class Message +{ +public: + template <typename ...PayloadArgs> + Message(PayloadArgs... payload) + : hdr(StreamDevHeader { + .protocol_version = STREAM_DEVICE_PROTOCOL, + .padding = 0, // Workaround GCC bug "sorry: not implemented" + .type = Type, + .size = (uint32_t) Info::size(payload...) + }) + { } + void write_header(Stream &stream) + { + stream.write_all("header", &hdr, sizeof(hdr)); + } + +protected: + StreamDevHeader hdr; + typedef Payload payload_t; +}; + +class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT> +{ +public: + FormatMessage(unsigned w, unsigned h, uint8_t c) : Message(w, h, c) {} + static size_t size(unsigned width, unsigned height, uint8_t codec) + { + return sizeof(payload_t); + } + void write_message_body(Stream &stream, unsigned w, unsigned h, uint8_t c) + { + StreamMsgFormat msg = { .width = w, .height = h, .codec = c, .padding1 = {} }; + stream.write_all("format", &msg, sizeof(msg)); + } +}; + +class FrameMessage : public Message<StreamMsgData, FrameMessage, STREAM_TYPE_DATA> +{ +public: + FrameMessage(const void *frame, size_t length) : Message(frame, length) {} + static size_t size(const void *frame, size_t length) + { + return sizeof(payload_t) + length; + } + void write_message_body(Stream &stream, const void *frame, size_t length) + { + stream.write_all("frame", frame, length); + } +}; + +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES> +{ +public: + CapabilitiesMessage() : Message() {} + static size_t size() + { + return sizeof(payload_t); + } + void write_message_body(Stream &stream) + { + /* No body for capabilities message */ + } +}; + +class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET> +{ +public: + X11CursorMessage(XFixesCursorImage *cursor): Message(cursor) {} + static size_t size(XFixesCursorImage *cursor) + { + return sizeof(payload_t) + sizeof(uint32_t) * pixel_count(cursor); + } + + void write_message_body(Stream &stream, XFixesCursorImage *cursor) + { + StreamMsgCursorSet msg = { + .width = cursor->width, + .height = cursor->height, + .hot_spot_x = cursor->xhot, + .hot_spot_y = cursor->yhot, + .type = SPICE_CURSOR_TYPE_ALPHA, + .padding1 = { }, + .data = { } + }; + + size_t pixcount = pixel_count(cursor); + size_t pixsize = pixcount * sizeof(uint32_t); + std::unique_ptr<uint32_t[]> pixels(new uint32_t[pixcount]); + uint32_t *pixbuf = pixels.get(); + fill_pixels(cursor, pixcount, pixbuf); + + stream.write_all("cursor message", &msg, sizeof(msg)); + stream.write_all("cursor pixels", pixbuf, pixsize); + } + +private: + static size_t pixel_count(XFixesCursorImage *cursor) + { + return cursor->width * cursor->height; + } + + void fill_pixels(XFixesCursorImage *cursor, unsigned count, uint32_t *pixbuf) + { + for (unsigned i = 0; i < count; ++i) { + pixbuf[i] = cursor->pixels[i]; + } + } +}; + }} // namespace spice::streaming_agent static bool quit_requested = false; @@ -166,15 +263,7 @@ void Stream::handle_stream_capabilities(uint32_t len) } // we currently do not support extensions so just reply so - StreamDevHeader hdr = { - STREAM_DEVICE_PROTOCOL, - 0, - STREAM_TYPE_CAPABILITIES, - 0 - }; - if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) { - throw std::runtime_error("error writing capabilities"); - } + send<CapabilitiesMessage>(); } void Stream::handle_stream_error(uint32_t len) @@ -228,7 +317,7 @@ int Stream::read_command(bool blocking) return 1; } -size_t Stream::write_all(const void *buf, const size_t len) +void Stream::write_all(const char *operation, const void *buf, const size_t len) { size_t written = 0; while (written < len) { @@ -237,73 +326,11 @@ size_t Stream::write_all(const void *buf, const size_t len) if (errno == EINTR) { continue; } - syslog(LOG_ERR, "write failed - %m"); - return l; + throw WriteError("write failed", operation, errno).syslog(); } written += l; } - syslog(LOG_DEBUG, "write_all -- %u bytes written\n", (unsigned)written); - return written; -} - -int Stream::send_format(unsigned w, unsigned h, uint8_t c) -{ - const size_t msgsize = sizeof(FormatMessage); - const size_t hdrsize = sizeof(StreamDevHeader); - FormatMessage msg = { - .hdr = { - .protocol_version = STREAM_DEVICE_PROTOCOL, - .padding = 0, // Workaround GCC "not implemented" bug - .type = STREAM_TYPE_FORMAT, - .size = msgsize - hdrsize - }, - .msg = { - .width = w, - .height = h, - .codec = c, - .padding1 = { } - } - }; - syslog(LOG_DEBUG, "writing format\n"); - std::lock_guard<std::mutex> stream_guard(mutex); - if (write_all(&msg, msgsize) != msgsize) { - return EXIT_FAILURE; - } - return EXIT_SUCCESS; -} - -int Stream::send_frame(const void *buf, const unsigned size) -{ - ssize_t n; - const size_t msgsize = sizeof(FormatMessage); - DataMessage msg = { - .hdr = { - .protocol_version = STREAM_DEVICE_PROTOCOL, - .padding = 0, // Workaround GCC "not implemented" bug - .type = STREAM_TYPE_DATA, - .size = size /* includes only the body? */ - }, - .msg = {} - }; - - std::lock_guard<std::mutex> stream_guard(mutex); - n = write_all(&msg, msgsize); - syslog(LOG_DEBUG, - "wrote %ld bytes of header of data msg with frame of size %u bytes\n", - n, msg.hdr.size); - if (n != msgsize) { - syslog(LOG_WARNING, "write_all header: wrote %ld expected %lu\n", - n, msgsize); - return EXIT_FAILURE; - } - n = write_all(buf, size); - syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n); - if (n != size) { - syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n", - n, size); - return EXIT_FAILURE; - } - return EXIT_SUCCESS; + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written); } /* returns current time in micro-seconds */ @@ -350,46 +377,6 @@ static void usage(const char *progname) exit(1); } -void -Stream::send_cursor(uint16_t width, uint16_t height, - uint16_t hotspot_x, uint16_t hotspot_y, - std::function<void(uint32_t *)> fill_cursor) -{ - if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) { - return; - } - - const uint32_t msgsize = sizeof(CursorMessage) + width * height * sizeof(uint32_t); - const uint32_t hdrsize = sizeof(StreamDevHeader); - - std::unique_ptr<uint8_t[]> storage(new uint8_t[msgsize]); - - CursorMessage *cursor_msg = - new(storage.get()) CursorMessage { - .hdr = { - .protocol_version = STREAM_DEVICE_PROTOCOL, - .padding = 0, // Workaround GCC internal / not implemented compiler error - .type = STREAM_TYPE_CURSOR_SET, - .size = msgsize - hdrsize - }, - .msg = { - .width = width, - .height = height, - .hot_spot_x = hotspot_x, - .hot_spot_y = hotspot_y, - .type = SPICE_CURSOR_TYPE_ALPHA, - .padding1 = { }, - .data = { } - } - }; - - uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data); - fill_cursor(pixels); - - std::lock_guard<std::mutex> stream_guard(mutex); - write_all(storage.get(), msgsize); -} - static void cursor_changes(Stream *stream, Display *display, int event_base) { unsigned long last_serial = 0; @@ -411,12 +398,7 @@ static void cursor_changes(Stream *stream, Display *display, int event_base) } last_serial = cursor->cursor_serial; - auto fill_cursor = [cursor](uint32_t *pixels) { - for (unsigned i = 0; i < cursor->width * cursor->height; ++i) - pixels[i] = cursor->pixels[i]; - }; - stream->send_cursor(cursor->width, cursor->height, - cursor->xhot, cursor->yhot, fill_cursor); + stream->send<X11CursorMessage>(cursor); } } @@ -471,9 +453,7 @@ do_capture(Stream &stream, const char *streamport, FILE *f_log) syslog(LOG_DEBUG, "wXh %uX%u codec=%u\n", width, height, codec); - if (stream.send_format(width, height, codec) == EXIT_FAILURE) { - throw std::runtime_error("FAILED to send format message"); - } + stream.send<FormatMessage>(width, height, codec); } if (f_log) { if (log_binary) { @@ -484,10 +464,7 @@ do_capture(Stream &stream, const char *streamport, FILE *f_log) hexdump(frame.buffer, frame.buffer_size, f_log); } } - if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) { - syslog(LOG_ERR, "FAILED to send a frame\n"); - break; - } + stream.send<FrameMessage>(frame.buffer, frame.buffer_size); //usleep(1); if (stream.read_command(false) < 0) { syslog(LOG_ERR, "FAILED to read command\n"); -- 2.13.5 (Apple Git-94) _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel