On Thu, 2018-05-03 at 07:35 -0400, Frediano Ziglio wrote: > > Wrap the streaming virtio port along with the mutex to lock it in a > > class. Pass the class temporarily around to functions that need it until > > the functions too are consolidated into the class. > > > > Signed-off-by: Lukáš Hrázký <lhrazky@xxxxxxxxxx> > > The mutex is supposed to avoid situations like: > > 1- data header > 2- cursor header > 3- data payload > 4- cursor payload > > protecting single write instead of full messages allow these > situations. Right, of course... > To fix this I would either: > 1- document that synchronization should be done outside the class > (either removing the mutex from class or using it) > 2- add a lock/unlock; > 3- add a StreamPortLocked class and move write/read to it. The locker > class will hold the lock and make sure read/write are done with > the mutex held. This also allows to make the mutex field private > (using friend class) and prevent usage or read/write without > the mutex held. > > (I prefer 3 but all are good). This is meant to be a temporary stage before the whole message IO is moved to the StreamPort class. So I think the easiest would be to make the StreamPort::mutex member public and keep the lock_guard in the methods for now. More refactor should follow. Thanks, Lukas > Frediano > > > --- > > src/spice-streaming-agent.cpp | 104 > > +++++++++++++++++------------------------- > > src/stream-port.cpp | 25 ++++++++++ > > src/stream-port.hpp | 14 ++++++ > > 3 files changed, 81 insertions(+), 62 deletions(-) > > > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp > > index 692f067..2fdd02f 100644 > > --- a/src/spice-streaming-agent.cpp > > +++ b/src/spice-streaming-agent.cpp > > @@ -32,7 +32,6 @@ > > #include <exception> > > #include <stdexcept> > > #include <memory> > > -#include <mutex> > > #include <thread> > > #include <vector> > > #include <string> > > @@ -60,12 +59,10 @@ static bool streaming_requested = false; > > static bool quit_requested = false; > > static bool log_binary = false; > > static std::set<SpiceVideoCodecType> client_codecs; > > -static int streamfd = -1; > > -static std::mutex stream_mtx; > > > > -static int have_something_to_read(int timeout) > > +static int have_something_to_read(StreamPort &stream_port, int timeout) > > { > > - struct pollfd pollfd = {streamfd, POLLIN, 0}; > > + struct pollfd pollfd = {stream_port.fd, POLLIN, 0}; > > > > if (poll(&pollfd, 1, timeout) < 0) { > > syslog(LOG_ERR, "poll FAILED\n"); > > @@ -79,7 +76,7 @@ static int have_something_to_read(int timeout) > > return 0; > > } > > > > -static void handle_stream_start_stop(uint32_t len) > > +static void handle_stream_start_stop(StreamPort &stream_port, uint32_t len) > > { > > uint8_t msg[256]; > > > > @@ -88,7 +85,7 @@ static void handle_stream_start_stop(uint32_t len) > > "(longer than " + > > std::to_string(sizeof(msg)) + ")"); > > } > > > > - read_all(streamfd, msg, len); > > + stream_port.read(msg, len); > > streaming_requested = (msg[0] != 0); /* num_codecs */ > > syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n", > > streaming_requested ? "START" : "STOP"); > > @@ -98,7 +95,7 @@ static void handle_stream_start_stop(uint32_t len) > > } > > } > > > > -static void handle_stream_capabilities(uint32_t len) > > +static void handle_stream_capabilities(StreamPort &stream_port, uint32_t > > len) > > { > > uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; > > > > @@ -106,7 +103,7 @@ static void handle_stream_capabilities(uint32_t len) > > throw std::runtime_error("capability message too long"); > > } > > > > - read_all(streamfd, caps, len); > > + stream_port.read(caps, len); > > // we currently do not support extensions so just reply so > > StreamDevHeader hdr = { > > STREAM_DEVICE_PROTOCOL, > > @@ -114,10 +111,10 @@ static void handle_stream_capabilities(uint32_t len) > > STREAM_TYPE_CAPABILITIES, > > 0 > > }; > > - write_all(streamfd, &hdr, sizeof(hdr)); > > + stream_port.write(&hdr, sizeof(hdr)); > > } > > > > -static void handle_stream_error(size_t len) > > +static void handle_stream_error(StreamPort &stream_port, size_t len) > > { > > if (len < sizeof(StreamMsgNotifyError)) { > > throw std::runtime_error("Received NotifyError message size " + > > std::to_string(len) + > > @@ -131,7 +128,7 @@ static void handle_stream_error(size_t len) > > > > size_t len_to_read = std::min(len, sizeof(msg) - 1); > > > > - read_all(streamfd, &msg, len_to_read); > > + stream_port.read(&msg, len_to_read); > > msg.msg[len_to_read - sizeof(StreamMsgNotifyError)] = '\0'; > > > > syslog(LOG_ERR, "Received NotifyError message from the server: %d - > > %s\n", > > @@ -143,13 +140,11 @@ static void handle_stream_error(size_t len) > > } > > } > > > > -static void read_command_from_device(void) > > +static void read_command_from_device(StreamPort &stream_port) > > { > > StreamDevHeader hdr; > > > > - std::lock_guard<std::mutex> stream_guard(stream_mtx); > > - > > - read_all(streamfd, &hdr, sizeof(hdr)); > > + stream_port.read(&hdr, sizeof(hdr)); > > > > if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { > > throw std::runtime_error("BAD VERSION " + > > std::to_string(hdr.protocol_version) + > > @@ -158,34 +153,34 @@ static void read_command_from_device(void) > > > > switch (hdr.type) { > > case STREAM_TYPE_CAPABILITIES: > > - return handle_stream_capabilities(hdr.size); > > + return handle_stream_capabilities(stream_port, hdr.size); > > case STREAM_TYPE_NOTIFY_ERROR: > > - return handle_stream_error(hdr.size); > > + return handle_stream_error(stream_port, hdr.size); > > case STREAM_TYPE_START_STOP: > > - return handle_stream_start_stop(hdr.size); > > + return handle_stream_start_stop(stream_port, hdr.size); > > } > > throw std::runtime_error("UNKNOWN msg of type " + > > std::to_string(hdr.type)); > > } > > > > -static int read_command(bool blocking) > > +static int read_command(StreamPort &stream_port, bool blocking) > > { > > int timeout = blocking?-1:0; > > while (!quit_requested) { > > - if (!have_something_to_read(timeout)) { > > + if (!have_something_to_read(stream_port, timeout)) { > > if (!blocking) { > > return 0; > > } > > sleep(1); > > continue; > > } > > - read_command_from_device(); > > + read_command_from_device(stream_port); > > break; > > } > > > > return 1; > > } > > > > -static void spice_stream_send_format(unsigned w, unsigned h, unsigned c) > > +static void spice_stream_send_format(StreamPort &stream_port, unsigned w, > > unsigned h, unsigned c) > > { > > > > SpiceStreamFormatMessage msg; > > @@ -199,11 +194,10 @@ static void spice_stream_send_format(unsigned w, > > unsigned h, unsigned c) > > msg.msg.height = h; > > msg.msg.codec = c; > > syslog(LOG_DEBUG, "writing format\n"); > > - std::lock_guard<std::mutex> stream_guard(stream_mtx); > > - write_all(streamfd, &msg, msgsize); > > + stream_port.write(&msg, msgsize); > > } > > > > -static void spice_stream_send_frame(const void *buf, const unsigned size) > > +static void spice_stream_send_frame(StreamPort &stream_port, const void > > *buf, const unsigned size) > > { > > SpiceStreamDataMessage msg; > > const size_t msgsize = sizeof(msg); > > @@ -212,9 +206,8 @@ static void spice_stream_send_frame(const void *buf, > > const unsigned size) > > msg.hdr.protocol_version = STREAM_DEVICE_PROTOCOL; > > msg.hdr.type = STREAM_TYPE_DATA; > > msg.hdr.size = size; /* includes only the body? */ > > - std::lock_guard<std::mutex> stream_guard(stream_mtx); > > - write_all(streamfd, &msg, msgsize); > > - write_all(streamfd, buf, size); > > + stream_port.write(&msg, msgsize); > > + stream_port.write(buf, size); > > > > syslog(LOG_DEBUG, "Sent a frame of size %u\n", size); > > } > > @@ -264,7 +257,7 @@ static void usage(const char *progname) > > } > > > > static void > > -send_cursor(unsigned width, unsigned height, int hotspot_x, int hotspot_y, > > +send_cursor(StreamPort &stream_port, unsigned width, unsigned height, int > > hotspot_x, int hotspot_y, > > std::function<void(uint32_t *)> fill_cursor) > > { > > if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= > > STREAM_MSG_CURSOR_SET_MAX_HEIGHT) { > > @@ -294,11 +287,10 @@ send_cursor(unsigned width, unsigned height, int > > hotspot_x, int hotspot_y, > > uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg.data); > > fill_cursor(pixels); > > > > - std::lock_guard<std::mutex> stream_guard(stream_mtx); > > - write_all(streamfd, msg.get(), cursor_size); > > + stream_port.write(msg.get(), cursor_size); > > } > > > > -static void cursor_changes(Display *display, int event_base) > > +static void cursor_changes(StreamPort *stream_port, Display *display, int > > event_base) > > { > > unsigned long last_serial = 0; > > > > @@ -323,26 +315,18 @@ static void cursor_changes(Display *display, int > > event_base) > > for (unsigned i = 0; i < cursor->width * cursor->height; ++i) > > pixels[i] = cursor->pixels[i]; > > }; > > - send_cursor(cursor->width, cursor->height, cursor->xhot, > > cursor->yhot, fill_cursor); > > + send_cursor(*stream_port, cursor->width, cursor->height, > > cursor->xhot, cursor->yhot, fill_cursor); > > } > > } > > > > static void > > -do_capture(const char *streamport, FILE *f_log) > > +do_capture(StreamPort &stream_port, FILE *f_log) > > { > > - streamfd = open(streamport, O_RDWR | O_NONBLOCK); > > - if (streamfd < 0) { > > - throw std::runtime_error("failed to open the streaming device (" + > > - std::string(streamport) + "): " > > - + strerror(errno)); > > - } > > - > > unsigned int frame_count = 0; > > while (!quit_requested) { > > while (!quit_requested && !streaming_requested) { > > - if (read_command(true) < 0) { > > - syslog(LOG_ERR, "FAILED to read command\n"); > > - goto done; > > + if (read_command(stream_port, true) < 0) { > > + throw std::runtime_error("FAILED to read command"); > > } > > } > > > > @@ -385,7 +369,7 @@ do_capture(const char *streamport, FILE *f_log) > > > > syslog(LOG_DEBUG, "wXh %uX%u codec=%u\n", width, height, > > codec); > > > > - spice_stream_send_format(width, height, codec); > > + spice_stream_send_format(stream_port, width, height, codec); > > } > > if (f_log) { > > if (log_binary) { > > @@ -398,32 +382,25 @@ do_capture(const char *streamport, FILE *f_log) > > } > > > > try { > > - spice_stream_send_frame(frame.buffer, frame.buffer_size); > > + spice_stream_send_frame(stream_port, frame.buffer, > > frame.buffer_size); > > } catch (const WriteError& e) { > > syslog(e); > > break; > > } > > > > //usleep(1); > > - if (read_command(false) < 0) { > > - syslog(LOG_ERR, "FAILED to read command\n"); > > - goto done; > > + if (read_command(stream_port, false) < 0) { > > + throw std::runtime_error("FAILED to read command"); > > } > > } > > } > > - > > -done: > > - if (streamfd >= 0) { > > - close(streamfd); > > - streamfd = -1; > > - } > > } > > > > #define arg_error(...) syslog(LOG_ERR, ## __VA_ARGS__); > > > > int main(int argc, char* argv[]) > > { > > - const char *streamport = "/dev/virtio-ports/org.spice-space.stream.0"; > > + const char *stream_port_name = > > "/dev/virtio-ports/org.spice-space.stream.0"; > > int opt; > > const char *log_filename = NULL; > > int logmask = LOG_UPTO(LOG_WARNING); > > @@ -454,7 +431,7 @@ int main(int argc, char* argv[]) > > pluginsdir = optarg; > > break; > > case 'p': > > - streamport = optarg; > > + stream_port_name = optarg; > > break; > > case 'c': { > > char *p = strchr(optarg, '='); > > @@ -512,12 +489,15 @@ int main(int argc, char* argv[]) > > Window rootwindow = DefaultRootWindow(display); > > XFixesSelectCursorInput(display, rootwindow, > > XFixesDisplayCursorNotifyMask); > > > > - std::thread cursor_th(cursor_changes, display, event_base); > > - cursor_th.detach(); > > - > > int ret = EXIT_SUCCESS; > > + > > try { > > - do_capture(streamport, f_log); > > + StreamPort stream_port(stream_port_name); > > + > > + std::thread cursor_th(cursor_changes, &stream_port, display, > > event_base); > > + cursor_th.detach(); > > + > > + do_capture(stream_port, f_log); > > } > > catch (std::exception &err) { > > syslog(LOG_ERR, "%s\n", err.what()); > > diff --git a/src/stream-port.cpp b/src/stream-port.cpp > > index ee85179..3cd4753 100644 > > --- a/src/stream-port.cpp > > +++ b/src/stream-port.cpp > > @@ -8,6 +8,7 @@ > > #include "error.hpp" > > > > #include <errno.h> > > +#include <fcntl.h> > > #include <poll.h> > > #include <string.h> > > #include <syslog.h> > > @@ -18,6 +19,30 @@ > > namespace spice { > > namespace streaming_agent { > > > > +StreamPort::StreamPort(const std::string &port_name) : > > fd(open(port_name.c_str(), O_RDWR | O_NONBLOCK)) > > +{ > > + if (fd < 0) { > > + throw IOError("Failed to open the streaming device \"" + port_name + > > "\"", errno); > > + } > > +} > > + > > +StreamPort::~StreamPort() > > +{ > > + close(fd); > > +} > > + > > +void StreamPort::read(void *buf, size_t len) > > +{ > > + std::lock_guard<std::mutex> guard(mutex); > > + read_all(fd, buf, len); > > +} > > + > > +void StreamPort::write(const void *buf, size_t len) > > +{ > > + std::lock_guard<std::mutex> guard(mutex); > > + write_all(fd, buf, len); > > +} > > + > > void read_all(int fd, void *buf, size_t len) > > { > > while (len > 0) { > > diff --git a/src/stream-port.hpp b/src/stream-port.hpp > > index b2d8352..9187cf5 100644 > > --- a/src/stream-port.hpp > > +++ b/src/stream-port.hpp > > @@ -8,11 +8,25 @@ > > #define SPICE_STREAMING_AGENT_STREAM_PORT_HPP > > > > #include <cstddef> > > +#include <string> > > +#include <mutex> > > > > > > namespace spice { > > namespace streaming_agent { > > > > +class StreamPort { > > +public: > > + StreamPort(const std::string &port_name); > > + ~StreamPort(); > > + > > + void read(void *buf, size_t len); > > + void write(const void *buf, size_t len); > > + > > + int fd; > > + std::mutex mutex; > > +}; > > + > > void read_all(int fd, void *buf, size_t len); > > void write_all(int fd, const void *buf, size_t len); > > _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel