> On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky@xxxxxxxxxx> wrote: > > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote: >> From: Christophe de Dinechin <dinechin@xxxxxxxxxx> >> >> Doing this change will make it possible to move the capture loop to the >> concrete-agent.cpp file. >> >> Signed-off-by: Christophe de Dinechin <dinechin@xxxxxxxxxx> >> --- >> include/spice-streaming-agent/errors.hpp | 2 + >> src/Makefile.am | 2 + >> src/message.hpp | 41 ++++++ >> src/spice-streaming-agent.cpp | 209 +------------------------------ >> src/stream.cpp | 172 +++++++++++++++++++++++++ >> src/stream.hpp | 55 ++++++++ >> 6 files changed, 276 insertions(+), 205 deletions(-) >> create mode 100644 src/message.hpp >> create mode 100644 src/stream.cpp >> create mode 100644 src/stream.hpp >> >> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp >> index 870a0fd..62ae010 100644 >> --- a/include/spice-streaming-agent/errors.hpp >> +++ b/include/spice-streaming-agent/errors.hpp >> @@ -90,4 +90,6 @@ protected: >> >> }} // namespace spice::streaming_agent >> >> +extern bool quit_requested; > > Putting quit_requested into errors.hpp? Why? Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98. > >> + >> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP >> diff --git a/src/Makefile.am b/src/Makefile.am >> index 2507844..923a103 100644 >> --- a/src/Makefile.am >> +++ b/src/Makefile.am >> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \ >> mjpeg-fallback.hpp \ >> jpeg.cpp \ >> jpeg.hpp \ >> + stream.cpp \ >> + stream.hpp \ >> errors.cpp \ >> $(NULL) >> diff --git a/src/message.hpp b/src/message.hpp >> new file mode 100644 >> index 0000000..28b3e28 >> --- /dev/null >> +++ b/src/message.hpp >> @@ -0,0 +1,41 @@ >> +/* Formatting messages >> + * >> + * \copyright >> + * Copyright 2018 Red Hat Inc. All rights reserved. >> + */ >> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP >> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP >> + >> +#include <spice/stream-device.h> >> + >> +namespace spice >> +{ >> +namespace streaming_agent >> +{ >> + >> +template <typename Payload, typename Info, unsigned Type> >> +class Message >> +{ >> +public: >> + template <typename ...PayloadArgs> >> + Message(PayloadArgs... payload) >> + : hdr(StreamDevHeader { >> + .protocol_version = STREAM_DEVICE_PROTOCOL, >> + .padding = 0, // Workaround GCC bug "sorry: not implemented" >> + .type = Type, >> + .size = (uint32_t) Info::size(payload...) >> + }) >> + { } >> + void write_header(Stream &stream) >> + { >> + stream.write_all("header", &hdr, sizeof(hdr)); >> + } >> + >> +protected: >> + StreamDevHeader hdr; >> + typedef Payload payload_t; >> +}; >> + >> +}} // namespace spice::streaming_agent >> + >> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP >> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp >> index 35e65bb..c401a34 100644 >> --- a/src/spice-streaming-agent.cpp >> +++ b/src/spice-streaming-agent.cpp >> @@ -5,6 +5,8 @@ >> */ >> >> #include "concrete-agent.hpp" >> +#include "stream.hpp" >> +#include "message.hpp" >> #include "hexdump.h" >> #include "mjpeg-fallback.hpp" >> >> @@ -21,11 +23,9 @@ >> #include <inttypes.h> >> #include <string.h> >> #include <getopt.h> >> -#include <unistd.h> >> #include <errno.h> >> -#include <fcntl.h> >> +#include <unistd.h> >> #include <sys/time.h> >> -#include <poll.h> >> #include <syslog.h> >> #include <signal.h> >> #include <exception> >> @@ -57,76 +57,6 @@ static uint64_t get_time(void) >> >> } >> >> -class Stream >> -{ >> - typedef std::set<SpiceVideoCodecType> codecs_t; >> - >> -public: >> - Stream(const char *name) >> - : codecs() >> - { >> - streamfd = open(name, O_RDWR); >> - if (streamfd < 0) { >> - throw IOError("failed to open streaming device", errno); >> - } >> - } >> - ~Stream() >> - { >> - close(streamfd); >> - } >> - >> - const codecs_t &client_codecs() { return codecs; } >> - bool streaming_requested() { return is_streaming; } >> - >> - template <typename Message, typename ...PayloadArgs> >> - void send(PayloadArgs... payload) >> - { >> - Message message(payload...); >> - std::lock_guard<std::mutex> stream_guard(mutex); >> - message.write_header(*this); >> - message.write_message_body(*this, payload...); >> - } >> - >> - int read_command(bool blocking); >> - void write_all(const char *operation, const void *buf, const size_t len); >> - >> -private: >> - int have_something_to_read(int timeout); >> - void handle_stream_start_stop(uint32_t len); >> - void handle_stream_capabilities(uint32_t len); >> - void handle_stream_error(uint32_t len); >> - void read_command_from_device(void); >> - >> -private: >> - std::mutex mutex; >> - codecs_t codecs; >> - int streamfd = -1; >> - bool is_streaming = false; >> -}; >> - >> -template <typename Payload, typename Info, unsigned Type> >> -class Message >> -{ >> -public: >> - template <typename ...PayloadArgs> >> - Message(PayloadArgs... payload) >> - : hdr(StreamDevHeader { >> - .protocol_version = STREAM_DEVICE_PROTOCOL, >> - .padding = 0, // Workaround GCC bug "sorry: not implemented" >> - .type = Type, >> - .size = (uint32_t) Info::size(payload...) >> - }) >> - { } >> - void write_header(Stream &stream) >> - { >> - stream.write_all("header", &hdr, sizeof(hdr)); >> - } >> - >> -protected: >> - StreamDevHeader hdr; >> - typedef Payload payload_t; >> -}; >> - >> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT> >> { >> public: >> @@ -156,20 +86,6 @@ public: >> } >> }; >> >> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES> >> -{ >> -public: >> - CapabilitiesMessage() : Message() {} >> - static size_t size() >> - { >> - return sizeof(payload_t); >> - } >> - void write_message_body(Stream &stream) >> - { >> - /* No body for capabilities message */ >> - } >> -}; >> - >> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET> >> { >> public: >> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream) >> >> }} // namespace spice::streaming_agent >> >> -static bool quit_requested = false; >> - >> -int Stream::have_something_to_read(int timeout) >> -{ >> - struct pollfd pollfd = {streamfd, POLLIN, 0}; >> - >> - if (poll(&pollfd, 1, timeout) < 0) { >> - syslog(LOG_ERR, "poll FAILED\n"); >> - return -1; >> - } >> - >> - if (pollfd.revents == POLLIN) { >> - return 1; >> - } >> - >> - return 0; >> -} >> - >> -void Stream::handle_stream_start_stop(uint32_t len) >> -{ >> - uint8_t msg[256]; >> - >> - if (len >= sizeof(msg)) { >> - throw MessageDataError("message is too long", len, sizeof(msg)); >> - } >> - int n = read(streamfd, &msg, len); >> - if (n != (int) len) { >> - throw MessageDataError("read start/stop command from device failed", n, len, errno); >> - } >> - is_streaming = (msg[0] != 0); /* num_codecs */ >> - syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n", >> - is_streaming ? "START" : "STOP"); >> - codecs.clear(); >> - for (int i = 1; i <= msg[0]; ++i) { >> - codecs.insert((SpiceVideoCodecType) msg[i]); >> - } >> -} >> - >> -void Stream::handle_stream_capabilities(uint32_t len) >> -{ >> - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; >> - >> - if (len > sizeof(caps)) { >> - throw MessageDataError("capability message too long", len, sizeof(caps)); >> - } >> - int n = read(streamfd, caps, len); >> - if (n != (int) len) { >> - throw MessageDataError("read capabilities from device failed", n, len, errno); >> - } >> - >> - // we currently do not support extensions so just reply so >> - send<CapabilitiesMessage>(); >> -} >> - >> -void Stream::handle_stream_error(uint32_t len) >> -{ >> - // TODO read message and use it >> - throw ProtocolError("got an error message from server"); >> -} >> - >> -void Stream::read_command_from_device() >> -{ >> - StreamDevHeader hdr; >> - int n; >> - >> - std::lock_guard<std::mutex> stream_guard(mutex); >> - n = read(streamfd, &hdr, sizeof(hdr)); >> - if (n != sizeof(hdr)) { >> - throw MessageDataError("read command from device failed", n, sizeof(hdr), errno); >> - } >> - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { >> - throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL); >> - } >> - >> - switch (hdr.type) { >> - case STREAM_TYPE_CAPABILITIES: >> - return handle_stream_capabilities(hdr.size); >> - case STREAM_TYPE_NOTIFY_ERROR: >> - return handle_stream_error(hdr.size); >> - case STREAM_TYPE_START_STOP: >> - return handle_stream_start_stop(hdr.size); >> - } >> - throw MessageDataError("unknown message type", hdr.type, 0); >> -} >> - >> -int Stream::read_command(bool blocking) >> -{ >> - int timeout = blocking?-1:0; >> - while (!quit_requested) { >> - if (!have_something_to_read(timeout)) { >> - if (!blocking) { >> - return 0; >> - } >> - sleep(1); >> - continue; >> - } >> - read_command_from_device(); >> - break; >> - } >> - >> - return 1; >> -} >> - >> -void Stream::write_all(const char *operation, const void *buf, const size_t len) >> -{ >> - size_t written = 0; >> - while (written < len) { >> - int l = write(streamfd, (const char *) buf + written, len - written); >> - if (l < 0) { >> - if (errno == EINTR) { >> - continue; >> - } >> - throw WriteError("write failed", operation, errno).syslog(); >> - } >> - written += l; >> - } >> - syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written); >> -} >> +bool quit_requested = false; >> >> static void handle_interrupt(int intr) >> { >> diff --git a/src/stream.cpp b/src/stream.cpp >> new file mode 100644 >> index 0000000..f756097 >> --- /dev/null >> +++ b/src/stream.cpp >> @@ -0,0 +1,172 @@ >> +/* Encapsulation of the stream used to communicate between agent and server >> + * >> + * \copyright >> + * Copyright 2018 Red Hat Inc. All rights reserved. >> + */ >> + >> +#include "stream.hpp" >> +#include "message.hpp" >> + >> +#include <spice/stream-device.h> >> + >> +#include <spice-streaming-agent/errors.hpp> >> + >> +#include <sys/types.h> >> +#include <sys/stat.h> >> +#include <fcntl.h> >> +#include <poll.h> >> +#include <syslog.h> >> +#include <unistd.h> >> + >> +namespace spice >> +{ >> +namespace streaming_agent >> +{ >> + >> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, >> + STREAM_TYPE_CAPABILITIES> >> +{ >> +public: >> + CapabilitiesMessage() : Message() {} >> + static size_t size() >> + { >> + return sizeof(payload_t); >> + } >> + void write_message_body(Stream &stream) >> + { >> + /* No body for capabilities message */ >> + } >> +}; > > Not sure I like scattering the messages across source files that happen > to use them, though I suppose you did it because each message (like the > X11Cursor) may require different header files included? Perhaps it is > the way to go… No, it’s really to de-couple things, a good way to check if encapsulation was correct. > >> + >> +Stream::Stream(const char *name) >> + : codecs() >> +{ >> + streamfd = open(name, O_RDWR); >> + if (streamfd < 0) { >> + throw IOError("failed to open streaming device", errno); >> + } >> +} >> + >> +Stream::~Stream() >> +{ >> + close(streamfd); >> +} >> + >> +int Stream::have_something_to_read(int timeout) >> +{ >> + struct pollfd pollfd = {streamfd, POLLIN, 0}; >> + >> + if (poll(&pollfd, 1, timeout) < 0) { >> + syslog(LOG_ERR, "poll FAILED\n"); >> + return -1; >> + } >> + >> + if (pollfd.revents == POLLIN) { >> + return 1; >> + } >> + >> + return 0; >> +} >> + >> +void Stream::handle_stream_start_stop(uint32_t len) >> +{ >> + uint8_t msg[256]; >> + >> + if (len >= sizeof(msg)) { >> + throw MessageDataError("message is too long", len, sizeof(msg)); >> + } >> + int n = read(streamfd, &msg, len); >> + if (n != (int) len) { >> + throw MessageDataError("read start/stop command from device failed", n, len, errno); >> + } >> + is_streaming = (msg[0] != 0); /* num_codecs */ >> + syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n", >> + is_streaming ? "START" : "STOP"); >> + codecs.clear(); >> + for (int i = 1; i <= msg[0]; ++i) { >> + codecs.insert((SpiceVideoCodecType) msg[i]); >> + } >> +} >> + >> +void Stream::handle_stream_capabilities(uint32_t len) >> +{ >> + uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; >> + >> + if (len > sizeof(caps)) { >> + throw MessageDataError("capability message too long", len, sizeof(caps)); >> + } >> + int n = read(streamfd, caps, len); >> + if (n != (int) len) { >> + throw MessageDataError("read capabilities from device failed", n, len, errno); >> + } >> + >> + // we currently do not support extensions so just reply so >> + send<CapabilitiesMessage>(); >> +} >> + >> +void Stream::handle_stream_error(uint32_t len) >> +{ >> + // TODO read message and use it >> + throw ProtocolError("got an error message from server"); >> +} >> + >> +void Stream::read_command_from_device() >> +{ >> + StreamDevHeader hdr; >> + int n; >> + >> + std::lock_guard<std::mutex> stream_guard(mutex); >> + n = read(streamfd, &hdr, sizeof(hdr)); >> + if (n != sizeof(hdr)) { >> + throw MessageDataError("read command from device failed", n, sizeof(hdr), errno); >> + } >> + if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { >> + throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL); >> + } >> + >> + switch (hdr.type) { >> + case STREAM_TYPE_CAPABILITIES: >> + return handle_stream_capabilities(hdr.size); >> + case STREAM_TYPE_NOTIFY_ERROR: >> + return handle_stream_error(hdr.size); >> + case STREAM_TYPE_START_STOP: >> + return handle_stream_start_stop(hdr.size); >> + } >> + throw MessageDataError("unknown message type", hdr.type, 0); >> +} >> + >> +int Stream::read_command(bool blocking) >> +{ >> + int timeout = blocking?-1:0; >> + while (!quit_requested) { >> + if (!have_something_to_read(timeout)) { >> + if (!blocking) { >> + return 0; >> + } >> + sleep(1); >> + continue; >> + } >> + read_command_from_device(); >> + break; >> + } >> + >> + return 1; >> +} >> + >> +void Stream::write_all(const char *operation, const void *buf, const size_t len) >> +{ >> + size_t written = 0; >> + while (written < len) { >> + int l = write(streamfd, (const char *) buf + written, len - written); >> + if (l < 0) { >> + if (errno == EINTR) { >> + continue; >> + } >> + throw WriteError("write failed", operation, errno).syslog(); >> + } >> + written += l; >> + } >> + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written); >> +} >> + >> +}} // namespace spice::streaming_agent >> diff --git a/src/stream.hpp b/src/stream.hpp >> new file mode 100644 >> index 0000000..b689f36 >> --- /dev/null >> +++ b/src/stream.hpp >> @@ -0,0 +1,55 @@ >> +/* Encapsulation of the stream used to communicate between agent and server >> + * >> + * \copyright >> + * Copyright 2018 Red Hat Inc. All rights reserved. >> + */ >> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP >> +#define SPICE_STREAMING_AGENT_STREAM_HPP >> + >> +#include <spice/enums.h> >> +#include <set> >> +#include <mutex> >> + >> +namespace spice { >> +namespace streaming_agent { >> + >> +class Stream >> +{ >> + typedef std::set<SpiceVideoCodecType> codecs_t; >> + >> +public: >> + Stream(const char *name); >> + ~Stream(); >> + >> + const codecs_t &client_codecs() { return codecs; } >> + bool streaming_requested() { return is_streaming; } >> + >> + template <typename Message, typename ...PayloadArgs> >> + void send(PayloadArgs... payload) >> + { >> + Message message(payload...); >> + std::lock_guard<std::mutex> stream_guard(mutex); >> + message.write_header(*this); >> + message.write_message_body(*this, payload...); >> + } >> + >> + int read_command(bool blocking); >> + void write_all(const char *operation, const void *buf, const size_t len); >> + >> +private: >> + int have_something_to_read(int timeout); >> + void handle_stream_start_stop(uint32_t len); >> + void handle_stream_capabilities(uint32_t len); >> + void handle_stream_error(uint32_t len); >> + void read_command_from_device(void); >> + >> +private: >> + std::mutex mutex; >> + codecs_t codecs; >> + int streamfd = -1; >> + bool is_streaming = false; >> +}; >> + >> +}} // namespace spice::streaming_agent >> + >> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel