> > On Tue, 2018-10-09 at 04:02 -0400, Frediano Ziglio wrote: > > > On Mon, 2018-10-08 at 07:03 -0400, Frediano Ziglio wrote: > > > > > > > > > > Wraps the deserialization of the received messages in an > > > > > InboundMessages > > > > > class. The class is created with the deserialized header and the raw > > > > > data of the message. A template function get_payload() returns the > > > > > struct of the concrete message. The function is specialized for each > > > > > incoming message. > > > > > > > > > > While this leaves the responsibility to call the get_payload() > > > > > function > > > > > with the message according to the type in the header to the caller, > > > > > the > > > > > solution preserves the efficiency of the original implementation > > > > > without > > > > > introducing too much complexity around the separation of the code. > > > > > > > > > > Signed-off-by: Lukáš Hrázký <lhrazky@xxxxxxxxxx> > > > > > --- > > > > > src/spice-streaming-agent.cpp | 115 > > > > > +++++++++------------------------- > > > > > src/stream-port.cpp | 70 ++++++++++++++++++++- > > > > > src/stream-port.hpp | 41 +++++++++++- > > > > > 3 files changed, 139 insertions(+), 87 deletions(-) > > > > > > > > > > diff --git a/src/spice-streaming-agent.cpp > > > > > b/src/spice-streaming-agent.cpp > > > > > index a9baf4d..a89ba3f 100644 > > > > > --- a/src/spice-streaming-agent.cpp > > > > > +++ b/src/spice-streaming-agent.cpp > > > > > @@ -77,92 +77,39 @@ static bool have_something_to_read(StreamPort > > > > > &stream_port, bool blocking) > > > > > return false; > > > > > } > > > > > > > > > > -static void handle_stream_start_stop(StreamPort &stream_port, > > > > > uint32_t > > > > > len) > > > > > -{ > > > > > - uint8_t msg[256]; > > > > > - > > > > > - if (len >= sizeof(msg)) { > > > > > - throw std::runtime_error("msg size (" + std::to_string(len) > > > > > + ") > > > > > is > > > > > too long " > > > > > - "(longer than " + > > > > > std::to_string(sizeof(msg)) + ")"); > > > > > - } > > > > > - > > > > > - stream_port.read(msg, len); > > > > > - streaming_requested = (msg[0] != 0); /* num_codecs */ > > > > > - syslog(LOG_INFO, "GOT START_STOP message -- request to %s > > > > > streaming", > > > > > - streaming_requested ? "START" : "STOP"); > > > > > - client_codecs.clear(); > > > > > - for (int i = 1; i <= msg[0]; ++i) { > > > > > - client_codecs.insert((SpiceVideoCodecType) msg[i]); > > > > > - } > > > > > -} > > > > > - > > > > > -static void handle_stream_capabilities(StreamPort &stream_port, > > > > > uint32_t > > > > > len) > > > > > -{ > > > > > - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; > > > > > - > > > > > - if (len > sizeof(caps)) { > > > > > - throw std::runtime_error("capability message too long"); > > > > > - } > > > > > - > > > > > - stream_port.read(caps, len); > > > > > - // we currently do not support extensions so just reply so > > > > > - StreamDevHeader hdr = { > > > > > - STREAM_DEVICE_PROTOCOL, > > > > > - 0, > > > > > - STREAM_TYPE_CAPABILITIES, > > > > > - 0 > > > > > - }; > > > > > - > > > > > - stream_port.write(&hdr, sizeof(hdr)); > > > > > -} > > > > > - > > > > > -static void handle_stream_error(StreamPort &stream_port, size_t len) > > > > > -{ > > > > > - if (len < sizeof(StreamMsgNotifyError)) { > > > > > - throw std::runtime_error("Received NotifyError message size > > > > > " + > > > > > std::to_string(len) + > > > > > - " is too small (smaller than " + > > > > > - > > > > > std::to_string(sizeof(StreamMsgNotifyError)) > > > > > + ")"); > > > > > - } > > > > > - > > > > > - struct StreamMsgNotifyError1K : StreamMsgNotifyError { > > > > > - uint8_t msg[1024]; > > > > > - } msg; > > > > > - > > > > > - size_t len_to_read = std::min(len, sizeof(msg) - 1); > > > > > - > > > > > - stream_port.read(&msg, len_to_read); > > > > > - msg.msg[len_to_read - sizeof(StreamMsgNotifyError)] = '\0'; > > > > > - > > > > > - syslog(LOG_ERR, "Received NotifyError message from the server: > > > > > %d - > > > > > %s", > > > > > - msg.error_code, msg.msg); > > > > > - > > > > > - if (len_to_read < len) { > > > > > - throw std::runtime_error("Received NotifyError message size > > > > > " + > > > > > std::to_string(len) + > > > > > - " is too big (bigger than " + > > > > > std::to_string(sizeof(msg)) + ")"); > > > > > - } > > > > > -} > > > > > - > > > > > static void read_command_from_device(StreamPort &stream_port) > > > > > { > > > > > - StreamDevHeader hdr; > > > > > - > > > > > - std::lock_guard<std::mutex> guard(stream_port.mutex); > > > > > - stream_port.read(&hdr, sizeof(hdr)); > > > > > - > > > > > - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { > > > > > - throw std::runtime_error("BAD VERSION " + > > > > > std::to_string(hdr.protocol_version) + > > > > > - " (expected is " + > > > > > std::to_string(STREAM_DEVICE_PROTOCOL) + ")"); > > > > > - } > > > > > - > > > > > - switch (hdr.type) { > > > > > - case STREAM_TYPE_CAPABILITIES: > > > > > - return handle_stream_capabilities(stream_port, hdr.size); > > > > > - case STREAM_TYPE_NOTIFY_ERROR: > > > > > - return handle_stream_error(stream_port, hdr.size); > > > > > - case STREAM_TYPE_START_STOP: > > > > > - return handle_stream_start_stop(stream_port, hdr.size); > > > > > - } > > > > > - throw std::runtime_error("UNKNOWN msg of type " + > > > > > std::to_string(hdr.type)); > > > > > + InboundMessage in_message = stream_port.receive(); > > > > > + > > > > > + switch (in_message.header.type) { > > > > > + case STREAM_TYPE_CAPABILITIES: { > > > > > + StreamDevHeader hdr = { > > > > > + STREAM_DEVICE_PROTOCOL, > > > > > + 0, > > > > > + STREAM_TYPE_CAPABILITIES, > > > > > + 0 > > > > > + }; > > > > > + > > > > > + std::lock_guard<std::mutex> guard(stream_port.mutex); > > > > > + stream_port.write(&hdr, sizeof(hdr)); > > > > > + return; > > > > > + } case STREAM_TYPE_NOTIFY_ERROR: { > > > > > > > > Not against this line style but case are not aligned and we never > > > > used this style. > > > > > > Checking the coding style guide, it should be: > > > > > > // ... > > > } > > > case STREAM_TYPE_NOTIFY_ERROR: { > > > // ... > > > > > > Is this what you meant? I'll fix it in the next version. > > > > > > > Yes, was that > > > > > > > + NotifyErrorMessage msg = > > > > > in_message.get_payload<NotifyErrorMessage>(); > > > > > + > > > > > + syslog(LOG_ERR, "Received NotifyError message from the > > > > > server: > > > > > %d - > > > > > %s", > > > > > + msg.error_code, msg.message); > > > > > + return; > > > > > + } case STREAM_TYPE_START_STOP: { > > > > > + StartStopMessage msg = > > > > > in_message.get_payload<StartStopMessage>(); > > > > > + streaming_requested = msg.start_streaming; > > > > > + client_codecs = msg.client_codecs; > > > > > + > > > > > + syslog(LOG_INFO, "GOT START_STOP message -- request to %s > > > > > streaming", > > > > > + streaming_requested ? "START" : "STOP"); > > > > > + return; > > > > > + }} > > > > > + > > > > > + throw std::runtime_error("UNKNOWN msg of type " + > > > > > std::to_string(in_message.header.type)); > > > > > } > > > > > > > > > > static void read_command(StreamPort &stream_port, bool blocking) > > > > > diff --git a/src/stream-port.cpp b/src/stream-port.cpp > > > > > index 5528854..56747fd 100644 > > > > > --- a/src/stream-port.cpp > > > > > +++ b/src/stream-port.cpp > > > > > @@ -19,6 +19,58 @@ > > > > > namespace spice { > > > > > namespace streaming_agent { > > > > > > > > > > +InboundMessage::InboundMessage(const StreamDevHeader &header, > > > > > std::unique_ptr<uint8_t[]> &&data) : > > > > > + header(header), > > > > > + data(std::move(data)) > > > > > +{} > > > > > + > > > > > +template<> > > > > > +StartStopMessage InboundMessage::get_payload<StartStopMessage>() > > > > > +{ > > > > > > > > why not also checking that the message is really the right type > > > > instead of assuming the caller is doing the right thing? > > > > > > I didn't intend to make a safe and clean API for the stream-port > > > module. I aimed for simplicity instead, treating it as an integral part > > > of the streaming agent. If I wanted a safe and clean API, I'd probably > > > need to abstract other parts of the interface too, which would add > > > boilerplate, if not overhead. > > > > > > I can add the checks in the methods if you like, it just seems > > > redundant to me in the current rather simple situation and the tight > > > coupling between the stream-port module and its usage. > > > > > > > Don't mine, fine as it it. > > > > > > > + StartStopMessage msg; > > > > > + > > > > > + msg.start_streaming = data[0]; // num_codecs > > > > > + > > > > > + for (size_t i = 1; i <= data[0]; ++i) { > > > > > + msg.client_codecs.insert((SpiceVideoCodecType) data[i]); > > > > > + } > > > > > + > > > > > + return msg; > > > > > +} > > > > > + > > > > > +template<> > > > > > +InCapabilitiesMessage > > > > > InboundMessage::get_payload<InCapabilitiesMessage>() > > > > > +{ > > > > > + // no capabilities yet > > > > > + return InCapabilitiesMessage(); > > > > > +} > > > > > + > > > > > +template<> > > > > > +NotifyErrorMessage InboundMessage::get_payload<NotifyErrorMessage>() > > > > > +{ > > > > > + if (header.size < sizeof(StreamMsgNotifyError)) { > > > > > + throw std::runtime_error("Received NotifyError message size > > > > > " + > > > > > std::to_string(header.size) + > > > > > + " is too small (smaller than " + > > > > > + > > > > > std::to_string(sizeof(StreamMsgNotifyError)) > > > > > + ")"); > > > > > + } > > > > > + > > > > > + size_t msg_len = header.size - sizeof(StreamMsgNotifyError); > > > > > + if (msg_len > 1024) { > > > > > + throw std::runtime_error("Received NotifyError message is > > > > > too > > > > > long > > > > > (" + > > > > > + std::to_string(msg_len) + " > > > > > > 1024)"); > > > > > + } > > > > > + > > > > > + StreamMsgNotifyError *raw_message = > > > > > reinterpret_cast<StreamMsgNotifyError*>(data.get()); > > > > > + > > > > > + NotifyErrorMessage msg; > > > > > + msg.error_code = raw_message->error_code; > > > > > + strncpy(msg.message, reinterpret_cast<char*>(raw_message->msg), > > > > > msg_len); > > > > > + // make sure the string is terminated > > > > > + msg.message[msg_len] = '\0'; > > > > > + > > > > > + return msg; > > > > > +} > > > > > + > > > > > StreamPort::StreamPort(const std::string &port_name) : > > > > > fd(open(port_name.c_str(), O_RDWR | O_NONBLOCK)) > > > > > { > > > > > if (fd < 0) { > > > > > @@ -31,9 +83,23 @@ StreamPort::~StreamPort() > > > > > close(fd); > > > > > } > > > > > > > > > > -void StreamPort::read(void *buf, size_t len) > > > > > +InboundMessage StreamPort::receive() > > > > > { > > > > > - read_all(fd, buf, len); > > > > > + std::lock_guard<std::mutex> stream_guard(mutex); > > > > > + > > > > > + StreamDevHeader header; > > > > > + read_all(fd, &header, sizeof(header)); > > > > > + > > > > > + if (header.protocol_version != STREAM_DEVICE_PROTOCOL) { > > > > > + throw std::runtime_error("Bad protocol version: " + > > > > > std::to_string(header.protocol_version) + > > > > > + ", expected: " + > > > > > std::to_string(STREAM_DEVICE_PROTOCOL)); > > > > > + } > > > > > + > > > > > + // TODO should we limit the maximum message size? > > > > > > > > This is a regression from previous code, should be added again. > > > > > > Yes, the difference is, in the previous code each message had it's own > > > custom limit. That is not possible anymore with this code (a small > > > disadvantage, although the modularization is well worth it imo). > > > > > > > A switch won't kill it. Modularity should not be a limit. > > Well, it wouldn't kill it, but it would be another place where you have > to switch over all of the message types. Right now there is only one > such place, which I find very nice and convenient... > > > > So we need to add a limit that is sufficient for the biggest message > > > that can be received. Which currently seems to be the > > > NotifyErrorMessage with a size of 1028B. What shall we use as the limit > > > here? 1028? Add some room for the future and use 2048? Or even bigger? > > > > > > > That make sense too, I think 4K won't kill either. > > I'll do that. I think one common message size limit doesn't hurt > anything here. > > > > > > + std::unique_ptr<uint8_t[]> data(new uint8_t[header.size]); > > > > > + read_all(fd, data.get(), header.size); > > > > > + > > > > > + return InboundMessage(header, std::move(data)); > > > > > } > > > > > > > > > > void StreamPort::write(const void *buf, size_t len) > > > > > diff --git a/src/stream-port.hpp b/src/stream-port.hpp > > > > > index 9187cf5..090930b 100644 > > > > > --- a/src/stream-port.hpp > > > > > +++ b/src/stream-port.hpp > > > > > @@ -7,20 +7,59 @@ > > > > > #ifndef SPICE_STREAMING_AGENT_STREAM_PORT_HPP > > > > > #define SPICE_STREAMING_AGENT_STREAM_PORT_HPP > > > > > > > > > > +#include <spice/stream-device.h> > > > > > +#include <spice/enums.h> > > > > > + > > > > > #include <cstddef> > > > > > #include <string> > > > > > +#include <memory> > > > > > #include <mutex> > > > > > +#include <set> > > > > > > > > > > > > > > > namespace spice { > > > > > namespace streaming_agent { > > > > > > > > > > +struct StartStopMessage > > > > > +{ > > > > > + bool start_streaming = false; > > > > > > > > This has not much to do with the message but with the usage of it, > > > > the agent should start streaming if there are any codec supported, > > > > so should test client_codecs only (actually start_streaming == > > > > !client_codecs.empty() as currently the agent assume the list > > > > contains a codecs it can handle, which can be wrong). > > > > > > In my opinion the information to start/stop streaming is different from > > > the information contained in the list of codecs and should be kept > > > separate. Using the codec list to represent this information is an > > > abuse. > > > > > > > Yes, is different and is not something the message can say, how the > > message can know the list of plugin the agent has? Is the agent that > > knows that, not the message, the agent derive it from its knowledge > > about plugins and this message. > > I'm not sure I understand, do you mean plugins or codecs here? I'm not > sure how plugins are related, if you mean codecs, I'm still not sure > what your point is. I'm not arguing about the need for the list of > codecs and matching them against what's available on the agent. > > > > I realize this is already done on the protocol level, but I wanted to > > > keep the information separate at least here. > > > > > > > What is at protocol level? The protocol just contains a list of > > codecs supported by the client(s). Is up to the user of the message > > to decide what to do. > > The message is called StartStopStreaming, the purpose of it is to tell > the agent whether to start or to stop the streaming. The way this > information is conveyed is that if there are any codecs in the > client_codecs list, the command is "start", if there are no codecs in > the list, the command is "stop". So at the protocol level, the contents > of the client_codecs list is "abused" to infer the command. > > Yes, you can (ab)use it this way, because the start/stop command > coincides with the contents of the client_codecs list. But especially > with regards to how difficult it is for us to change the protocol, I > think it should be as clean and as explicit as possible. > Oh, make sense, I would put something like (comment and boolean) // No codecs present indicates to stop streaming msg.start_streaming = (data[0] != 0); (or !! instead of != 0) > Cheers, > Lukas > > > > Thanks for the review, > > > Lukas > > > > > > > > + std::set<SpiceVideoCodecType> client_codecs; > > > > > +}; > > > > > + > > > > > +struct InCapabilitiesMessage {}; > > > > > + > > > > > +struct NotifyErrorMessage > > > > > +{ > > > > > + uint32_t error_code; > > > > > + char message[1025]; > > > > > +}; > > > > > + > > > > > +class InboundMessage > > > > > +{ > > > > > +public: > > > > > + InboundMessage(const StreamDevHeader &header, > > > > > std::unique_ptr<uint8_t[]> > > > > > &&data); > > > > > + > > > > > + template<class Payload> Payload get_payload(); > > > > > + > > > > > + const StreamDevHeader header; > > > > > +private: > > > > > + std::unique_ptr<uint8_t[]> data; > > > > > +}; > > > > > + > > > > > +template<> > > > > > +StartStopMessage InboundMessage::get_payload<StartStopMessage>(); > > > > > +template<> > > > > > +InCapabilitiesMessage > > > > > InboundMessage::get_payload<InCapabilitiesMessage>(); > > > > > +template<> > > > > > +NotifyErrorMessage > > > > > InboundMessage::get_payload<NotifyErrorMessage>(); > > > > > + > > > > > class StreamPort { > > > > > public: > > > > > StreamPort(const std::string &port_name); > > > > > ~StreamPort(); > > > > > > > > > > - void read(void *buf, size_t len); > > > > > + InboundMessage receive(); > > > > > + > > > > > void write(const void *buf, size_t len); > > > > > > > > > > int fd; > > > > > > > > Frediano > _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel