From: Christophe de Dinechin <dinechin@xxxxxxxxxx> Doing this change will make it possible to move the capture loop to the concrete-agent.cpp file. Signed-off-by: Christophe de Dinechin <dinechin@xxxxxxxxxx> --- include/spice-streaming-agent/errors.hpp | 2 + src/Makefile.am | 2 + src/message.hpp | 41 ++++++ src/spice-streaming-agent.cpp | 209 +------------------------------ src/stream.cpp | 172 +++++++++++++++++++++++++ src/stream.hpp | 55 ++++++++ 6 files changed, 276 insertions(+), 205 deletions(-) create mode 100644 src/message.hpp create mode 100644 src/stream.cpp create mode 100644 src/stream.hpp diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp index 870a0fd..62ae010 100644 --- a/include/spice-streaming-agent/errors.hpp +++ b/include/spice-streaming-agent/errors.hpp @@ -90,4 +90,6 @@ protected: }} // namespace spice::streaming_agent +extern bool quit_requested; + #endif // SPICE_STREAMING_AGENT_ERRORS_HPP diff --git a/src/Makefile.am b/src/Makefile.am index 2507844..923a103 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \ mjpeg-fallback.hpp \ jpeg.cpp \ jpeg.hpp \ + stream.cpp \ + stream.hpp \ errors.cpp \ $(NULL) diff --git a/src/message.hpp b/src/message.hpp new file mode 100644 index 0000000..28b3e28 --- /dev/null +++ b/src/message.hpp @@ -0,0 +1,41 @@ +/* Formatting messages + * + * \copyright + * Copyright 2018 Red Hat Inc. All rights reserved. + */ +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP +#define SPICE_STREAMING_AGENT_MESSAGE_HPP + +#include <spice/stream-device.h> + +namespace spice +{ +namespace streaming_agent +{ + +template <typename Payload, typename Info, unsigned Type> +class Message +{ +public: + template <typename ...PayloadArgs> + Message(PayloadArgs... payload) + : hdr(StreamDevHeader { + .protocol_version = STREAM_DEVICE_PROTOCOL, + .padding = 0, // Workaround GCC bug "sorry: not implemented" + .type = Type, + .size = (uint32_t) Info::size(payload...) + }) + { } + void write_header(Stream &stream) + { + stream.write_all("header", &hdr, sizeof(hdr)); + } + +protected: + StreamDevHeader hdr; + typedef Payload payload_t; +}; + +}} // namespace spice::streaming_agent + +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp index 35e65bb..c401a34 100644 --- a/src/spice-streaming-agent.cpp +++ b/src/spice-streaming-agent.cpp @@ -5,6 +5,8 @@ */ #include "concrete-agent.hpp" +#include "stream.hpp" +#include "message.hpp" #include "hexdump.h" #include "mjpeg-fallback.hpp" @@ -21,11 +23,9 @@ #include <inttypes.h> #include <string.h> #include <getopt.h> -#include <unistd.h> #include <errno.h> -#include <fcntl.h> +#include <unistd.h> #include <sys/time.h> -#include <poll.h> #include <syslog.h> #include <signal.h> #include <exception> @@ -57,76 +57,6 @@ static uint64_t get_time(void) } -class Stream -{ - typedef std::set<SpiceVideoCodecType> codecs_t; - -public: - Stream(const char *name) - : codecs() - { - streamfd = open(name, O_RDWR); - if (streamfd < 0) { - throw IOError("failed to open streaming device", errno); - } - } - ~Stream() - { - close(streamfd); - } - - const codecs_t &client_codecs() { return codecs; } - bool streaming_requested() { return is_streaming; } - - template <typename Message, typename ...PayloadArgs> - void send(PayloadArgs... payload) - { - Message message(payload...); - std::lock_guard<std::mutex> stream_guard(mutex); - message.write_header(*this); - message.write_message_body(*this, payload...); - } - - int read_command(bool blocking); - void write_all(const char *operation, const void *buf, const size_t len); - -private: - int have_something_to_read(int timeout); - void handle_stream_start_stop(uint32_t len); - void handle_stream_capabilities(uint32_t len); - void handle_stream_error(uint32_t len); - void read_command_from_device(void); - -private: - std::mutex mutex; - codecs_t codecs; - int streamfd = -1; - bool is_streaming = false; -}; - -template <typename Payload, typename Info, unsigned Type> -class Message -{ -public: - template <typename ...PayloadArgs> - Message(PayloadArgs... payload) - : hdr(StreamDevHeader { - .protocol_version = STREAM_DEVICE_PROTOCOL, - .padding = 0, // Workaround GCC bug "sorry: not implemented" - .type = Type, - .size = (uint32_t) Info::size(payload...) - }) - { } - void write_header(Stream &stream) - { - stream.write_all("header", &hdr, sizeof(hdr)); - } - -protected: - StreamDevHeader hdr; - typedef Payload payload_t; -}; - class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT> { public: @@ -156,20 +86,6 @@ public: } }; -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES> -{ -public: - CapabilitiesMessage() : Message() {} - static size_t size() - { - return sizeof(payload_t); - } - void write_message_body(Stream &stream) - { - /* No body for capabilities message */ - } -}; - class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET> { public: @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream) }} // namespace spice::streaming_agent -static bool quit_requested = false; - -int Stream::have_something_to_read(int timeout) -{ - struct pollfd pollfd = {streamfd, POLLIN, 0}; - - if (poll(&pollfd, 1, timeout) < 0) { - syslog(LOG_ERR, "poll FAILED\n"); - return -1; - } - - if (pollfd.revents == POLLIN) { - return 1; - } - - return 0; -} - -void Stream::handle_stream_start_stop(uint32_t len) -{ - uint8_t msg[256]; - - if (len >= sizeof(msg)) { - throw MessageDataError("message is too long", len, sizeof(msg)); - } - int n = read(streamfd, &msg, len); - if (n != (int) len) { - throw MessageDataError("read start/stop command from device failed", n, len, errno); - } - is_streaming = (msg[0] != 0); /* num_codecs */ - syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n", - is_streaming ? "START" : "STOP"); - codecs.clear(); - for (int i = 1; i <= msg[0]; ++i) { - codecs.insert((SpiceVideoCodecType) msg[i]); - } -} - -void Stream::handle_stream_capabilities(uint32_t len) -{ - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; - - if (len > sizeof(caps)) { - throw MessageDataError("capability message too long", len, sizeof(caps)); - } - int n = read(streamfd, caps, len); - if (n != (int) len) { - throw MessageDataError("read capabilities from device failed", n, len, errno); - } - - // we currently do not support extensions so just reply so - send<CapabilitiesMessage>(); -} - -void Stream::handle_stream_error(uint32_t len) -{ - // TODO read message and use it - throw ProtocolError("got an error message from server"); -} - -void Stream::read_command_from_device() -{ - StreamDevHeader hdr; - int n; - - std::lock_guard<std::mutex> stream_guard(mutex); - n = read(streamfd, &hdr, sizeof(hdr)); - if (n != sizeof(hdr)) { - throw MessageDataError("read command from device failed", n, sizeof(hdr), errno); - } - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { - throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL); - } - - switch (hdr.type) { - case STREAM_TYPE_CAPABILITIES: - return handle_stream_capabilities(hdr.size); - case STREAM_TYPE_NOTIFY_ERROR: - return handle_stream_error(hdr.size); - case STREAM_TYPE_START_STOP: - return handle_stream_start_stop(hdr.size); - } - throw MessageDataError("unknown message type", hdr.type, 0); -} - -int Stream::read_command(bool blocking) -{ - int timeout = blocking?-1:0; - while (!quit_requested) { - if (!have_something_to_read(timeout)) { - if (!blocking) { - return 0; - } - sleep(1); - continue; - } - read_command_from_device(); - break; - } - - return 1; -} - -void Stream::write_all(const char *operation, const void *buf, const size_t len) -{ - size_t written = 0; - while (written < len) { - int l = write(streamfd, (const char *) buf + written, len - written); - if (l < 0) { - if (errno == EINTR) { - continue; - } - throw WriteError("write failed", operation, errno).syslog(); - } - written += l; - } - syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written); -} +bool quit_requested = false; static void handle_interrupt(int intr) { diff --git a/src/stream.cpp b/src/stream.cpp new file mode 100644 index 0000000..f756097 --- /dev/null +++ b/src/stream.cpp @@ -0,0 +1,172 @@ +/* Encapsulation of the stream used to communicate between agent and server + * + * \copyright + * Copyright 2018 Red Hat Inc. All rights reserved. + */ + +#include "stream.hpp" +#include "message.hpp" + +#include <spice/stream-device.h> + +#include <spice-streaming-agent/errors.hpp> + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <poll.h> +#include <syslog.h> +#include <unistd.h> + +namespace spice +{ +namespace streaming_agent +{ + +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, + STREAM_TYPE_CAPABILITIES> +{ +public: + CapabilitiesMessage() : Message() {} + static size_t size() + { + return sizeof(payload_t); + } + void write_message_body(Stream &stream) + { + /* No body for capabilities message */ + } +}; + +Stream::Stream(const char *name) + : codecs() +{ + streamfd = open(name, O_RDWR); + if (streamfd < 0) { + throw IOError("failed to open streaming device", errno); + } +} + +Stream::~Stream() +{ + close(streamfd); +} + +int Stream::have_something_to_read(int timeout) +{ + struct pollfd pollfd = {streamfd, POLLIN, 0}; + + if (poll(&pollfd, 1, timeout) < 0) { + syslog(LOG_ERR, "poll FAILED\n"); + return -1; + } + + if (pollfd.revents == POLLIN) { + return 1; + } + + return 0; +} + +void Stream::handle_stream_start_stop(uint32_t len) +{ + uint8_t msg[256]; + + if (len >= sizeof(msg)) { + throw MessageDataError("message is too long", len, sizeof(msg)); + } + int n = read(streamfd, &msg, len); + if (n != (int) len) { + throw MessageDataError("read start/stop command from device failed", n, len, errno); + } + is_streaming = (msg[0] != 0); /* num_codecs */ + syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n", + is_streaming ? "START" : "STOP"); + codecs.clear(); + for (int i = 1; i <= msg[0]; ++i) { + codecs.insert((SpiceVideoCodecType) msg[i]); + } +} + +void Stream::handle_stream_capabilities(uint32_t len) +{ + uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; + + if (len > sizeof(caps)) { + throw MessageDataError("capability message too long", len, sizeof(caps)); + } + int n = read(streamfd, caps, len); + if (n != (int) len) { + throw MessageDataError("read capabilities from device failed", n, len, errno); + } + + // we currently do not support extensions so just reply so + send<CapabilitiesMessage>(); +} + +void Stream::handle_stream_error(uint32_t len) +{ + // TODO read message and use it + throw ProtocolError("got an error message from server"); +} + +void Stream::read_command_from_device() +{ + StreamDevHeader hdr; + int n; + + std::lock_guard<std::mutex> stream_guard(mutex); + n = read(streamfd, &hdr, sizeof(hdr)); + if (n != sizeof(hdr)) { + throw MessageDataError("read command from device failed", n, sizeof(hdr), errno); + } + if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { + throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL); + } + + switch (hdr.type) { + case STREAM_TYPE_CAPABILITIES: + return handle_stream_capabilities(hdr.size); + case STREAM_TYPE_NOTIFY_ERROR: + return handle_stream_error(hdr.size); + case STREAM_TYPE_START_STOP: + return handle_stream_start_stop(hdr.size); + } + throw MessageDataError("unknown message type", hdr.type, 0); +} + +int Stream::read_command(bool blocking) +{ + int timeout = blocking?-1:0; + while (!quit_requested) { + if (!have_something_to_read(timeout)) { + if (!blocking) { + return 0; + } + sleep(1); + continue; + } + read_command_from_device(); + break; + } + + return 1; +} + +void Stream::write_all(const char *operation, const void *buf, const size_t len) +{ + size_t written = 0; + while (written < len) { + int l = write(streamfd, (const char *) buf + written, len - written); + if (l < 0) { + if (errno == EINTR) { + continue; + } + throw WriteError("write failed", operation, errno).syslog(); + } + written += l; + } + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written); +} + +}} // namespace spice::streaming_agent diff --git a/src/stream.hpp b/src/stream.hpp new file mode 100644 index 0000000..b689f36 --- /dev/null +++ b/src/stream.hpp @@ -0,0 +1,55 @@ +/* Encapsulation of the stream used to communicate between agent and server + * + * \copyright + * Copyright 2018 Red Hat Inc. All rights reserved. + */ +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP +#define SPICE_STREAMING_AGENT_STREAM_HPP + +#include <spice/enums.h> +#include <set> +#include <mutex> + +namespace spice { +namespace streaming_agent { + +class Stream +{ + typedef std::set<SpiceVideoCodecType> codecs_t; + +public: + Stream(const char *name); + ~Stream(); + + const codecs_t &client_codecs() { return codecs; } + bool streaming_requested() { return is_streaming; } + + template <typename Message, typename ...PayloadArgs> + void send(PayloadArgs... payload) + { + Message message(payload...); + std::lock_guard<std::mutex> stream_guard(mutex); + message.write_header(*this); + message.write_message_body(*this, payload...); + } + + int read_command(bool blocking); + void write_all(const char *operation, const void *buf, const size_t len); + +private: + int have_something_to_read(int timeout); + void handle_stream_start_stop(uint32_t len); + void handle_stream_capabilities(uint32_t len); + void handle_stream_error(uint32_t len); + void read_command_from_device(void); + +private: + std::mutex mutex; + codecs_t codecs; + int streamfd = -1; + bool is_streaming = false; +}; + +}} // namespace spice::streaming_agent + +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP -- 2.13.5 (Apple Git-94) _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel