On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote: > From: Christophe de Dinechin <dinechin@xxxxxxxxxx> > > Doing this change will make it possible to move the capture loop to the > concrete-agent.cpp file. > > Signed-off-by: Christophe de Dinechin <dinechin@xxxxxxxxxx> > --- > include/spice-streaming-agent/errors.hpp | 2 + > src/Makefile.am | 2 + > src/message.hpp | 41 ++++++ > src/spice-streaming-agent.cpp | 209 +------------------------------ > src/stream.cpp | 172 +++++++++++++++++++++++++ > src/stream.hpp | 55 ++++++++ > 6 files changed, 276 insertions(+), 205 deletions(-) > create mode 100644 src/message.hpp > create mode 100644 src/stream.cpp > create mode 100644 src/stream.hpp > > diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp > index 870a0fd..62ae010 100644 > --- a/include/spice-streaming-agent/errors.hpp > +++ b/include/spice-streaming-agent/errors.hpp > @@ -90,4 +90,6 @@ protected: > > }} // namespace spice::streaming_agent > > +extern bool quit_requested; Putting quit_requested into errors.hpp? Why? > + > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP > diff --git a/src/Makefile.am b/src/Makefile.am > index 2507844..923a103 100644 > --- a/src/Makefile.am > +++ b/src/Makefile.am > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \ > mjpeg-fallback.hpp \ > jpeg.cpp \ > jpeg.hpp \ > + stream.cpp \ > + stream.hpp \ > errors.cpp \ > $(NULL) > diff --git a/src/message.hpp b/src/message.hpp > new file mode 100644 > index 0000000..28b3e28 > --- /dev/null > +++ b/src/message.hpp > @@ -0,0 +1,41 @@ > +/* Formatting messages > + * > + * \copyright > + * Copyright 2018 Red Hat Inc. All rights reserved. > + */ > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP > + > +#include <spice/stream-device.h> > + > +namespace spice > +{ > +namespace streaming_agent > +{ > + > +template <typename Payload, typename Info, unsigned Type> > +class Message > +{ > +public: > + template <typename ...PayloadArgs> > + Message(PayloadArgs... payload) > + : hdr(StreamDevHeader { > + .protocol_version = STREAM_DEVICE_PROTOCOL, > + .padding = 0, // Workaround GCC bug "sorry: not implemented" > + .type = Type, > + .size = (uint32_t) Info::size(payload...) > + }) > + { } > + void write_header(Stream &stream) > + { > + stream.write_all("header", &hdr, sizeof(hdr)); > + } > + > +protected: > + StreamDevHeader hdr; > + typedef Payload payload_t; > +}; > + > +}} // namespace spice::streaming_agent > + > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp > index 35e65bb..c401a34 100644 > --- a/src/spice-streaming-agent.cpp > +++ b/src/spice-streaming-agent.cpp > @@ -5,6 +5,8 @@ > */ > > #include "concrete-agent.hpp" > +#include "stream.hpp" > +#include "message.hpp" > #include "hexdump.h" > #include "mjpeg-fallback.hpp" > > @@ -21,11 +23,9 @@ > #include <inttypes.h> > #include <string.h> > #include <getopt.h> > -#include <unistd.h> > #include <errno.h> > -#include <fcntl.h> > +#include <unistd.h> > #include <sys/time.h> > -#include <poll.h> > #include <syslog.h> > #include <signal.h> > #include <exception> > @@ -57,76 +57,6 @@ static uint64_t get_time(void) > > } > > -class Stream > -{ > - typedef std::set<SpiceVideoCodecType> codecs_t; > - > -public: > - Stream(const char *name) > - : codecs() > - { > - streamfd = open(name, O_RDWR); > - if (streamfd < 0) { > - throw IOError("failed to open streaming device", errno); > - } > - } > - ~Stream() > - { > - close(streamfd); > - } > - > - const codecs_t &client_codecs() { return codecs; } > - bool streaming_requested() { return is_streaming; } > - > - template <typename Message, typename ...PayloadArgs> > - void send(PayloadArgs... payload) > - { > - Message message(payload...); > - std::lock_guard<std::mutex> stream_guard(mutex); > - message.write_header(*this); > - message.write_message_body(*this, payload...); > - } > - > - int read_command(bool blocking); > - void write_all(const char *operation, const void *buf, const size_t len); > - > -private: > - int have_something_to_read(int timeout); > - void handle_stream_start_stop(uint32_t len); > - void handle_stream_capabilities(uint32_t len); > - void handle_stream_error(uint32_t len); > - void read_command_from_device(void); > - > -private: > - std::mutex mutex; > - codecs_t codecs; > - int streamfd = -1; > - bool is_streaming = false; > -}; > - > -template <typename Payload, typename Info, unsigned Type> > -class Message > -{ > -public: > - template <typename ...PayloadArgs> > - Message(PayloadArgs... payload) > - : hdr(StreamDevHeader { > - .protocol_version = STREAM_DEVICE_PROTOCOL, > - .padding = 0, // Workaround GCC bug "sorry: not implemented" > - .type = Type, > - .size = (uint32_t) Info::size(payload...) > - }) > - { } > - void write_header(Stream &stream) > - { > - stream.write_all("header", &hdr, sizeof(hdr)); > - } > - > -protected: > - StreamDevHeader hdr; > - typedef Payload payload_t; > -}; > - > class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT> > { > public: > @@ -156,20 +86,6 @@ public: > } > }; > > -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES> > -{ > -public: > - CapabilitiesMessage() : Message() {} > - static size_t size() > - { > - return sizeof(payload_t); > - } > - void write_message_body(Stream &stream) > - { > - /* No body for capabilities message */ > - } > -}; > - > class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET> > { > public: > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream) > > }} // namespace spice::streaming_agent > > -static bool quit_requested = false; > - > -int Stream::have_something_to_read(int timeout) > -{ > - struct pollfd pollfd = {streamfd, POLLIN, 0}; > - > - if (poll(&pollfd, 1, timeout) < 0) { > - syslog(LOG_ERR, "poll FAILED\n"); > - return -1; > - } > - > - if (pollfd.revents == POLLIN) { > - return 1; > - } > - > - return 0; > -} > - > -void Stream::handle_stream_start_stop(uint32_t len) > -{ > - uint8_t msg[256]; > - > - if (len >= sizeof(msg)) { > - throw MessageDataError("message is too long", len, sizeof(msg)); > - } > - int n = read(streamfd, &msg, len); > - if (n != (int) len) { > - throw MessageDataError("read start/stop command from device failed", n, len, errno); > - } > - is_streaming = (msg[0] != 0); /* num_codecs */ > - syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n", > - is_streaming ? "START" : "STOP"); > - codecs.clear(); > - for (int i = 1; i <= msg[0]; ++i) { > - codecs.insert((SpiceVideoCodecType) msg[i]); > - } > -} > - > -void Stream::handle_stream_capabilities(uint32_t len) > -{ > - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; > - > - if (len > sizeof(caps)) { > - throw MessageDataError("capability message too long", len, sizeof(caps)); > - } > - int n = read(streamfd, caps, len); > - if (n != (int) len) { > - throw MessageDataError("read capabilities from device failed", n, len, errno); > - } > - > - // we currently do not support extensions so just reply so > - send<CapabilitiesMessage>(); > -} > - > -void Stream::handle_stream_error(uint32_t len) > -{ > - // TODO read message and use it > - throw ProtocolError("got an error message from server"); > -} > - > -void Stream::read_command_from_device() > -{ > - StreamDevHeader hdr; > - int n; > - > - std::lock_guard<std::mutex> stream_guard(mutex); > - n = read(streamfd, &hdr, sizeof(hdr)); > - if (n != sizeof(hdr)) { > - throw MessageDataError("read command from device failed", n, sizeof(hdr), errno); > - } > - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { > - throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL); > - } > - > - switch (hdr.type) { > - case STREAM_TYPE_CAPABILITIES: > - return handle_stream_capabilities(hdr.size); > - case STREAM_TYPE_NOTIFY_ERROR: > - return handle_stream_error(hdr.size); > - case STREAM_TYPE_START_STOP: > - return handle_stream_start_stop(hdr.size); > - } > - throw MessageDataError("unknown message type", hdr.type, 0); > -} > - > -int Stream::read_command(bool blocking) > -{ > - int timeout = blocking?-1:0; > - while (!quit_requested) { > - if (!have_something_to_read(timeout)) { > - if (!blocking) { > - return 0; > - } > - sleep(1); > - continue; > - } > - read_command_from_device(); > - break; > - } > - > - return 1; > -} > - > -void Stream::write_all(const char *operation, const void *buf, const size_t len) > -{ > - size_t written = 0; > - while (written < len) { > - int l = write(streamfd, (const char *) buf + written, len - written); > - if (l < 0) { > - if (errno == EINTR) { > - continue; > - } > - throw WriteError("write failed", operation, errno).syslog(); > - } > - written += l; > - } > - syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written); > -} > +bool quit_requested = false; > > static void handle_interrupt(int intr) > { > diff --git a/src/stream.cpp b/src/stream.cpp > new file mode 100644 > index 0000000..f756097 > --- /dev/null > +++ b/src/stream.cpp > @@ -0,0 +1,172 @@ > +/* Encapsulation of the stream used to communicate between agent and server > + * > + * \copyright > + * Copyright 2018 Red Hat Inc. All rights reserved. > + */ > + > +#include "stream.hpp" > +#include "message.hpp" > + > +#include <spice/stream-device.h> > + > +#include <spice-streaming-agent/errors.hpp> > + > +#include <sys/types.h> > +#include <sys/stat.h> > +#include <fcntl.h> > +#include <poll.h> > +#include <syslog.h> > +#include <unistd.h> > + > +namespace spice > +{ > +namespace streaming_agent > +{ > + > +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, > + STREAM_TYPE_CAPABILITIES> > +{ > +public: > + CapabilitiesMessage() : Message() {} > + static size_t size() > + { > + return sizeof(payload_t); > + } > + void write_message_body(Stream &stream) > + { > + /* No body for capabilities message */ > + } > +}; Not sure I like scattering the messages across source files that happen to use them, though I suppose you did it because each message (like the X11Cursor) may require different header files included? Perhaps it is the way to go... > + > +Stream::Stream(const char *name) > + : codecs() > +{ > + streamfd = open(name, O_RDWR); > + if (streamfd < 0) { > + throw IOError("failed to open streaming device", errno); > + } > +} > + > +Stream::~Stream() > +{ > + close(streamfd); > +} > + > +int Stream::have_something_to_read(int timeout) > +{ > + struct pollfd pollfd = {streamfd, POLLIN, 0}; > + > + if (poll(&pollfd, 1, timeout) < 0) { > + syslog(LOG_ERR, "poll FAILED\n"); > + return -1; > + } > + > + if (pollfd.revents == POLLIN) { > + return 1; > + } > + > + return 0; > +} > + > +void Stream::handle_stream_start_stop(uint32_t len) > +{ > + uint8_t msg[256]; > + > + if (len >= sizeof(msg)) { > + throw MessageDataError("message is too long", len, sizeof(msg)); > + } > + int n = read(streamfd, &msg, len); > + if (n != (int) len) { > + throw MessageDataError("read start/stop command from device failed", n, len, errno); > + } > + is_streaming = (msg[0] != 0); /* num_codecs */ > + syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n", > + is_streaming ? "START" : "STOP"); > + codecs.clear(); > + for (int i = 1; i <= msg[0]; ++i) { > + codecs.insert((SpiceVideoCodecType) msg[i]); > + } > +} > + > +void Stream::handle_stream_capabilities(uint32_t len) > +{ > + uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; > + > + if (len > sizeof(caps)) { > + throw MessageDataError("capability message too long", len, sizeof(caps)); > + } > + int n = read(streamfd, caps, len); > + if (n != (int) len) { > + throw MessageDataError("read capabilities from device failed", n, len, errno); > + } > + > + // we currently do not support extensions so just reply so > + send<CapabilitiesMessage>(); > +} > + > +void Stream::handle_stream_error(uint32_t len) > +{ > + // TODO read message and use it > + throw ProtocolError("got an error message from server"); > +} > + > +void Stream::read_command_from_device() > +{ > + StreamDevHeader hdr; > + int n; > + > + std::lock_guard<std::mutex> stream_guard(mutex); > + n = read(streamfd, &hdr, sizeof(hdr)); > + if (n != sizeof(hdr)) { > + throw MessageDataError("read command from device failed", n, sizeof(hdr), errno); > + } > + if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { > + throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL); > + } > + > + switch (hdr.type) { > + case STREAM_TYPE_CAPABILITIES: > + return handle_stream_capabilities(hdr.size); > + case STREAM_TYPE_NOTIFY_ERROR: > + return handle_stream_error(hdr.size); > + case STREAM_TYPE_START_STOP: > + return handle_stream_start_stop(hdr.size); > + } > + throw MessageDataError("unknown message type", hdr.type, 0); > +} > + > +int Stream::read_command(bool blocking) > +{ > + int timeout = blocking?-1:0; > + while (!quit_requested) { > + if (!have_something_to_read(timeout)) { > + if (!blocking) { > + return 0; > + } > + sleep(1); > + continue; > + } > + read_command_from_device(); > + break; > + } > + > + return 1; > +} > + > +void Stream::write_all(const char *operation, const void *buf, const size_t len) > +{ > + size_t written = 0; > + while (written < len) { > + int l = write(streamfd, (const char *) buf + written, len - written); > + if (l < 0) { > + if (errno == EINTR) { > + continue; > + } > + throw WriteError("write failed", operation, errno).syslog(); > + } > + written += l; > + } > + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written); > +} > + > +}} // namespace spice::streaming_agent > diff --git a/src/stream.hpp b/src/stream.hpp > new file mode 100644 > index 0000000..b689f36 > --- /dev/null > +++ b/src/stream.hpp > @@ -0,0 +1,55 @@ > +/* Encapsulation of the stream used to communicate between agent and server > + * > + * \copyright > + * Copyright 2018 Red Hat Inc. All rights reserved. > + */ > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP > +#define SPICE_STREAMING_AGENT_STREAM_HPP > + > +#include <spice/enums.h> > +#include <set> > +#include <mutex> > + > +namespace spice { > +namespace streaming_agent { > + > +class Stream > +{ > + typedef std::set<SpiceVideoCodecType> codecs_t; > + > +public: > + Stream(const char *name); > + ~Stream(); > + > + const codecs_t &client_codecs() { return codecs; } > + bool streaming_requested() { return is_streaming; } > + > + template <typename Message, typename ...PayloadArgs> > + void send(PayloadArgs... payload) > + { > + Message message(payload...); > + std::lock_guard<std::mutex> stream_guard(mutex); > + message.write_header(*this); > + message.write_message_body(*this, payload...); > + } > + > + int read_command(bool blocking); > + void write_all(const char *operation, const void *buf, const size_t len); > + > +private: > + int have_something_to_read(int timeout); > + void handle_stream_start_stop(uint32_t len); > + void handle_stream_capabilities(uint32_t len); > + void handle_stream_error(uint32_t len); > + void read_command_from_device(void); > + > +private: > + std::mutex mutex; > + codecs_t codecs; > + int streamfd = -1; > + bool is_streaming = false; > +}; > + > +}} // namespace spice::streaming_agent > + > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel