> > Heavily based on code by Christophe de Dinechin. > > Wraps the serialization code in the OutboundMessage class and its > descendants for each specific message. Uses Cruiously Recurring Template > Pattern (CRTP) to avoid runtime overhead of polymorphism. > > The messages are placed along with the code that sends them, this helps > to avoid header proliferation, e.g. the CursorMessage requires X11 > headers for it's interface. > > Signed-off-by: Lukáš Hrázký <lhrazky@xxxxxxxxxx> > --- > src/cursor-updater.cpp | 128 ++++++++++++++++++++-------------- > src/spice-streaming-agent.cpp | 97 ++++++++++++-------------- > src/stream-port.hpp | 35 ++++++++++ > 3 files changed, 154 insertions(+), 106 deletions(-) > > diff --git a/src/cursor-updater.cpp b/src/cursor-updater.cpp > index 8f65e83..9edb010 100644 > --- a/src/cursor-updater.cpp > +++ b/src/cursor-updater.cpp > @@ -12,52 +12,62 @@ > #include <spice/stream-device.h> > #include <spice/enums.h> > > -#include <cstring> > -#include <functional> > #include <memory> > +#include <vector> > +#include <unistd.h> > #include <X11/extensions/Xfixes.h> > > > namespace spice { > namespace streaming_agent { > > -namespace { > - > -void send_cursor(StreamPort &stream_port, unsigned width, unsigned height, > int hotspot_x, int hotspot_y, > - std::function<void(uint32_t *)> fill_cursor) > +class CursorError : public Error > { > - if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= > STREAM_MSG_CURSOR_SET_MAX_HEIGHT) { > - return; > - } > - > - size_t cursor_size = > - sizeof(StreamDevHeader) + sizeof(StreamMsgCursorSet) + > - width * height * sizeof(uint32_t); > - std::unique_ptr<uint8_t[]> msg(new uint8_t[cursor_size]); > + using Error::Error; > +}; > > - StreamDevHeader > &dev_hdr(*reinterpret_cast<StreamDevHeader*>(msg.get())); > - memset(&dev_hdr, 0, sizeof(dev_hdr)); > - dev_hdr.protocol_version = STREAM_DEVICE_PROTOCOL; > - dev_hdr.type = STREAM_TYPE_CURSOR_SET; > - dev_hdr.size = cursor_size - sizeof(StreamDevHeader); > - > - StreamMsgCursorSet &cursor_msg(*reinterpret_cast<StreamMsgCursorSet > *>(msg.get() + sizeof(StreamDevHeader))); > - memset(&cursor_msg, 0, sizeof(cursor_msg)); > +class CursorMessage : public OutboundMessage<StreamMsgCursorSet, > CursorMessage, STREAM_TYPE_CURSOR_SET> > +{ > +public: > + CursorMessage(uint16_t width, uint16_t height, uint16_t xhot, uint16_t > yhot, > + const std::vector<uint32_t> &pixels) > + : > + OutboundMessage(pixels) > + { > + if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH) { > + throw CursorError("Cursor width " + std::to_string(width) + > + " too big (limit is " + > std::to_string(STREAM_MSG_CURSOR_SET_MAX_WIDTH) + ")"); > + } > > - cursor_msg.type = SPICE_CURSOR_TYPE_ALPHA; > - cursor_msg.width = width; > - cursor_msg.height = height; > - cursor_msg.hot_spot_x = hotspot_x; > - cursor_msg.hot_spot_y = hotspot_y; > + if (height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) { > + throw CursorError("Cursor height " + std::to_string(height) + > + " too big (limit is " + > std::to_string(STREAM_MSG_CURSOR_SET_MAX_HEIGHT) + ")"); > + } > + } > > - uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data); > - fill_cursor(pixels); > + static size_t size(const std::vector<uint32_t> &pixels) > + { > + return sizeof(PayloadType) + sizeof(uint32_t) * pixels.size(); > + } > > - std::lock_guard<std::mutex> guard(stream_port.mutex); > - stream_port.write(msg.get(), cursor_size); > -} > + void write_message_body(StreamPort &stream_port, > + uint16_t width, uint16_t height, uint16_t xhot, uint16_t yhot, > + const std::vector<uint32_t> &pixels) > + { > + StreamMsgCursorSet msg = { > + .width = width, > + .height = height, > + .hot_spot_x = xhot, > + .hot_spot_y = yhot, > + .type = SPICE_CURSOR_TYPE_ALPHA, > + .padding1 = {}, > + .data = {} > + }; do not use C++20 features, we agreed on C++11. > > -} // namespace > + stream_port.write(&msg, sizeof(msg)); > + stream_port.write(pixels.data(), sizeof(uint32_t) * pixels.size()); > + } > +}; > > CursorUpdater::CursorUpdater(StreamPort *stream_port) : > stream_port(stream_port) > { > @@ -79,27 +89,39 @@ CursorUpdater::CursorUpdater(StreamPort *stream_port) : > stream_port(stream_port) > unsigned long last_serial = 0; > > while (1) { > - XEvent event; > - XNextEvent(display, &event); > - if (event.type != xfixes_event_base + 1) { > - continue; > - } > - > - XFixesCursorImage *cursor = XFixesGetCursorImage(display); > - if (!cursor) { > - continue; > + try { > + XEvent event; > + XNextEvent(display, &event); > + if (event.type != xfixes_event_base + 1) { > + continue; > + } > + > + XFixesCursorImage *cursor = XFixesGetCursorImage(display); > + if (!cursor) { > + continue; > + } > + > + if (cursor->cursor_serial == last_serial) { > + continue; > + } > + > + last_serial = cursor->cursor_serial; > + > + // the X11 cursor data may be in a wrong format, copy them to an > uint32_t array > + size_t pixcount = cursor->width * cursor->height; > + std::vector<uint32_t> pixels; > + pixels.reserve(pixcount); > + > + for (size_t i = 0; i < pixcount; ++i) { > + pixels.push_back(cursor->pixels[i]); > + } > + > + stream_port->send<CursorMessage>(cursor->width, cursor->height, > + cursor->xhot, cursor->yhot, > pixels); > + } catch (const std::exception &e) { > + ::syslog(LOG_ERR, "Error in cursor updater thread: %s", > e.what()); > + sleep(1); // rate-limit the error > } > - > - if (cursor->cursor_serial == last_serial) { > - continue; > - } > - > - last_serial = cursor->cursor_serial; > - auto fill_cursor = [cursor](uint32_t *pixels) { > - for (unsigned i = 0; i < cursor->width * cursor->height; ++i) > - pixels[i] = cursor->pixels[i]; > - }; > - send_cursor(*stream_port, cursor->width, cursor->height, > cursor->xhot, cursor->yhot, fill_cursor); > } > } > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp > index a89ba3f..39c53bd 100644 > --- a/src/spice-streaming-agent.cpp > +++ b/src/spice-streaming-agent.cpp > @@ -41,16 +41,51 @@ using namespace spice::streaming_agent; > > static ConcreteAgent agent; > > -struct SpiceStreamFormatMessage > +class FormatMessage : public OutboundMessage<StreamMsgFormat, FormatMessage, > STREAM_TYPE_FORMAT> > { > - StreamDevHeader hdr; > - StreamMsgFormat msg; > +public: > + FormatMessage(unsigned w, unsigned h, uint8_t c) {} > + > + static size_t size() > + { > + return sizeof(PayloadType); > + } > + > + void write_message_body(StreamPort &stream_port, unsigned w, unsigned h, > uint8_t c) > + { > + StreamMsgFormat msg = { .width = w, .height = h, .codec = c, > .padding1 = {} }; > + stream_port.write(&msg, sizeof(msg)); > + } > +}; > + > +class FrameMessage : public OutboundMessage<StreamMsgData, FrameMessage, > STREAM_TYPE_DATA> > +{ > +public: > + FrameMessage(const void *frame, size_t length) : OutboundMessage(length) > {} > + > + static size_t size(size_t length) > + { > + return sizeof(PayloadType) + length; > + } > + > + void write_message_body(StreamPort &stream_port, const void *frame, > size_t length) > + { > + stream_port.write(frame, length); > + } > }; > > -struct SpiceStreamDataMessage > +class CapabilitiesOutMessage : public OutboundMessage<StreamMsgCapabilities, > CapabilitiesOutMessage, STREAM_TYPE_CAPABILITIES> > { > - StreamDevHeader hdr; > - StreamMsgData msg; > +public: > + static size_t size() > + { > + return sizeof(PayloadType); > + } > + > + void write_message_body(StreamPort &stream_port) > + { > + // No body for capabilities message > + } > }; > > static bool streaming_requested = false; > @@ -83,15 +118,7 @@ static void read_command_from_device(StreamPort > &stream_port) > > switch (in_message.header.type) { > case STREAM_TYPE_CAPABILITIES: { > - StreamDevHeader hdr = { > - STREAM_DEVICE_PROTOCOL, > - 0, > - STREAM_TYPE_CAPABILITIES, > - 0 > - }; > - > - std::lock_guard<std::mutex> guard(stream_port.mutex); > - stream_port.write(&hdr, sizeof(hdr)); > + stream_port.send<CapabilitiesOutMessage>(); > return; > } case STREAM_TYPE_NOTIFY_ERROR: { > NotifyErrorMessage msg = > in_message.get_payload<NotifyErrorMessage>(); > @@ -128,42 +155,6 @@ static void read_command(StreamPort &stream_port, bool > blocking) > } > } > > -static void spice_stream_send_format(StreamPort &stream_port, unsigned w, > unsigned h, unsigned c) > -{ > - > - SpiceStreamFormatMessage msg; > - const size_t msgsize = sizeof(msg); > - const size_t hdrsize = sizeof(msg.hdr); > - memset(&msg, 0, msgsize); > - msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL; > - msg.hdr.type = STREAM_TYPE_FORMAT; > - msg.hdr.size = msgsize - hdrsize; /* includes only the body? */ > - msg.msg.width = w; > - msg.msg.height = h; > - msg.msg.codec = c; > - > - syslog(LOG_DEBUG, "writing format"); > - std::lock_guard<std::mutex> guard(stream_port.mutex); > - stream_port.write(&msg, msgsize); > -} > - > -static void spice_stream_send_frame(StreamPort &stream_port, const void > *buf, const unsigned size) > -{ > - SpiceStreamDataMessage msg; > - const size_t msgsize = sizeof(msg); > - > - memset(&msg, 0, msgsize); > - msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL; > - msg.hdr.type = STREAM_TYPE_DATA; > - msg.hdr.size = size; /* includes only the body? */ > - > - std::lock_guard<std::mutex> guard(stream_port.mutex); > - stream_port.write(&msg, msgsize); > - stream_port.write(buf, size); > - > - syslog(LOG_DEBUG, "Sent a frame of size %u", size); > -} > - > static void handle_interrupt(int intr) > { > syslog(LOG_INFO, "Got signal %d, exiting", intr); > @@ -249,13 +240,13 @@ do_capture(StreamPort &stream_port, FrameLog > &frame_log) > syslog(LOG_DEBUG, "wXh %uX%u codec=%u", width, height, > codec); > frame_log.log_stat("Started new stream wXh %uX%u codec=%u", > width, height, codec); > > - spice_stream_send_format(stream_port, width, height, codec); > + stream_port.send<FormatMessage>(width, height, codec); > } > frame_log.log_stat("Frame of %zu bytes", frame.buffer_size); > frame_log.log_frame(frame.buffer, frame.buffer_size); > > try { > - spice_stream_send_frame(stream_port, frame.buffer, > frame.buffer_size); > + stream_port.send<FrameMessage>(frame.buffer, > frame.buffer_size); > } catch (const WriteError& e) { > syslog(e); > break; > diff --git a/src/stream-port.hpp b/src/stream-port.hpp > index 090930b..6cb516b 100644 > --- a/src/stream-port.hpp > +++ b/src/stream-port.hpp > @@ -60,12 +60,47 @@ public: > > InboundMessage receive(); > > + template <typename Message, typename ...PayloadArgs> > + void send(PayloadArgs&&... payload_args) > + { > + Message message(payload_args...); > + std::lock_guard<std::mutex> stream_guard(mutex); > + message.write_header(*this); > + message.write_message_body(*this, payload_args...); > + } > + > void write(const void *buf, size_t len); > > int fd; > + > +private: OT: This looks great. Wondering if at the end also write and fd should be private. > std::mutex mutex; > }; > > +template <typename Payload, typename Message, unsigned Type> > +class OutboundMessage > +{ > +public: > + template <typename ...PayloadArgs> > + OutboundMessage(PayloadArgs&&... payload_args) : > + hdr(StreamDevHeader { > + .protocol_version = STREAM_DEVICE_PROTOCOL, > + .padding = 0, // Workaround GCC bug "sorry: not implemented" > + .type = Type, > + .size = (uint32_t) Message::size(payload_args...) > + }) > + {} > + > + void write_header(StreamPort &stream_port) > + { > + stream_port.write(&hdr, sizeof(hdr)); > + } > + > +protected: > + StreamDevHeader hdr; > + using PayloadType = Payload; > +}; > + > void read_all(int fd, void *buf, size_t len); > void write_all(int fd, const void *buf, size_t len); > Frediano _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel