> This is only part of the message corruption solution. > The other part is fixing virtio-serial / spice-qemu-char throttling > code. > > -replace write_[lock/unlock/completion] calls with > [new/enqueue]_message > -remove clipboard specific _out_msg_* class members > -remove ugly loop - while (a->_out_msg && a->write_clipboard()); > -add _message_mutex for message queue > -fix pending_write race using _write_mutex > -TODO: enqueue large message without dividing it to chunks in advance > ACK. > rhbz #846427 > --- > vdagent/vdagent.cpp | 186 > +++++++++++++++++++++++++-------------------------- > 1 files changed, 91 insertions(+), 95 deletions(-) > > diff --git a/vdagent/vdagent.cpp b/vdagent/vdagent.cpp > index b8bad44..3ffafe3 100644 > --- a/vdagent/vdagent.cpp > +++ b/vdagent/vdagent.cpp > @@ -94,10 +94,10 @@ private: > enum { CONTROL_STOP, CONTROL_DESKTOP_SWITCH }; > void set_control_event(int control_command); > void handle_control_event(); > - uint8_t* write_lock(DWORD bytes = 0); > - void write_unlock(DWORD bytes = 0); > + VDPipeMessage* new_message(DWORD bytes = 0); > + void enqueue_message(VDPipeMessage* msg); > bool write_message(uint32_t type, uint32_t size, void* data); > - bool write_clipboard(); > + bool write_clipboard(VDAgentMessage* msg, uint32_t size); > bool connect_pipe(); > bool send_input(); > void set_display_depth(uint32_t depth); > @@ -119,9 +119,6 @@ private: > HANDLE _clipboard_event; > VDAgentMessage* _in_msg; > uint32_t _in_msg_pos; > - VDAgentMessage* _out_msg; > - uint32_t _out_msg_pos; > - uint32_t _out_msg_size; > bool _pending_input; > bool _pending_write; > bool _running; > @@ -131,7 +128,9 @@ private: > VDPipeState _pipe_state; > mutex_t _write_mutex; > mutex_t _control_mutex; > + mutex_t _message_mutex; > std::queue<int> _control_queue; > + std::queue<VDPipeMessage*> _message_queue; > > bool _logon_desktop; > bool _display_setting_initialized; > @@ -167,9 +166,6 @@ VDAgent::VDAgent() > , _clipboard_event (NULL) > , _in_msg (NULL) > , _in_msg_pos (0) > - , _out_msg (NULL) > - , _out_msg_pos (0) > - , _out_msg_size (0) > , _pending_input (false) > , _pending_write (false) > , _running (false) > @@ -193,6 +189,7 @@ VDAgent::VDAgent() > ZeroMemory(&_pipe_state, sizeof(VDPipeState)); > MUTEX_INIT(_write_mutex); > MUTEX_INIT(_control_mutex); > + MUTEX_INIT(_message_mutex); > > _singleton = this; > } > @@ -538,7 +535,7 @@ bool > VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, > uint32_t port > } > > DWORD msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply); > - reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size); > + reply_pipe_msg = new_message(msg_size); > if (!reply_pipe_msg) { > return false; > } > @@ -553,10 +550,7 @@ bool > VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, > uint32_t port > reply = (VDAgentReply*)reply_msg->data; > reply->type = VD_AGENT_MONITORS_CONFIG; > reply->error = display_count ? VD_AGENT_SUCCESS : > VD_AGENT_ERROR; > - write_unlock(msg_size); > - if (!_pending_write) { > - write_completion(0, 0, &_pipe_state.write.overlap); > - } > + enqueue_message(reply_pipe_msg); > return true; > } > > @@ -669,7 +663,7 @@ bool VDAgent::send_announce_capabilities(bool > request) > uint32_t internal_msg_size = sizeof(VDAgentAnnounceCapabilities) > + VD_AGENT_CAPS_BYTES; > > msg_size = VD_MESSAGE_HEADER_SIZE + internal_msg_size; > - caps_pipe_msg = (VDPipeMessage*)write_lock(msg_size); > + caps_pipe_msg = new_message(msg_size); > if (!caps_pipe_msg) { > return false; > } > @@ -694,10 +688,7 @@ bool VDAgent::send_announce_capabilities(bool > request) > for (uint32_t i = 0 ; i < caps_size; ++i) { > vd_printf("%X", caps->caps[i]); > } > - write_unlock(msg_size); > - if (!_pending_write) { > - write_completion(0, 0, &_pipe_state.write.overlap); > - } > + enqueue_message(caps_pipe_msg); > return true; > } > > @@ -750,11 +741,10 @@ bool > VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, > uint32 > } > > msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply); > - reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size); > + reply_pipe_msg = new_message(msg_size); > if (!reply_pipe_msg) { > return false; > } > - > reply_pipe_msg->type = VD_AGENT_COMMAND; > reply_pipe_msg->opaque = port; > reply_pipe_msg->size = sizeof(VDAgentMessage) + > sizeof(VDAgentReply); > @@ -766,10 +756,7 @@ bool > VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, > uint32 > reply = (VDAgentReply*)reply_msg->data; > reply->type = VD_AGENT_DISPLAY_CONFIG; > reply->error = VD_AGENT_SUCCESS; > - write_unlock(msg_size); > - if (!_pending_write) { > - write_completion(0, 0, &_pipe_state.write.overlap); > - } > + enqueue_message(reply_pipe_msg); > return true; > } > > @@ -778,16 +765,13 @@ bool VDAgent::handle_control(VDPipeMessage* > msg) > switch (msg->type) { > case VD_AGENT_RESET: { > vd_printf("Agent reset"); > - VDPipeMessage* ack = > (VDPipeMessage*)write_lock(sizeof(VDPipeMessage)); > + VDPipeMessage* ack = new_message(sizeof(VDPipeMessage)); > if (!ack) { > return false; > } > ack->type = VD_AGENT_RESET_ACK; > ack->opaque = msg->opaque; > - write_unlock(sizeof(VDPipeMessage)); > - if (!_pending_write) { > - write_completion(0, 0, &_pipe_state.write.overlap); > - } > + enqueue_message(ack); > break; > } > case VD_AGENT_SESSION_LOGON: > @@ -816,30 +800,30 @@ bool VDAgent::handle_control(VDPipeMessage* > msg) > > //FIXME: division to max size chunks should NOT be here, but in the > service > // here we should write the max possible size to the pipe > -bool VDAgent::write_clipboard() > +bool VDAgent::write_clipboard(VDAgentMessage* msg, uint32_t size) > { > - ASSERT(_out_msg); > - DWORD n = MIN(sizeof(VDPipeMessage) + _out_msg_size - > _out_msg_pos, VD_AGENT_MAX_DATA_SIZE); > - VDPipeMessage* pipe_msg = (VDPipeMessage*)write_lock(n); > - if (!pipe_msg) { > - return false; > - } > - pipe_msg->type = VD_AGENT_COMMAND; > - pipe_msg->opaque = VDP_CLIENT_PORT; > - pipe_msg->size = n - sizeof(VDPipeMessage); > - memcpy(pipe_msg->data, (char*)_out_msg + _out_msg_pos, n - > sizeof(VDPipeMessage)); > - write_unlock(n); > - if (!_pending_write) { > - write_completion(0, 0, &_pipe_state.write.overlap); > - } > - _out_msg_pos += (n - sizeof(VDPipeMessage)); > - if (_out_msg_pos == _out_msg_size) { > - delete[] (uint8_t *)_out_msg; > - _out_msg = NULL; > - _out_msg_size = 0; > - _out_msg_pos = 0; > - } > - return true; > + uint32_t pos = 0; > + bool ret = true; > + > + ASSERT(msg && size); > + //FIXME: do it smarter - no loop, no memcopy > + MUTEX_LOCK(_message_mutex); > + while (pos < size) { > + DWORD n = MIN(sizeof(VDPipeMessage) + size - pos, > VD_AGENT_MAX_DATA_SIZE); > + VDPipeMessage* pipe_msg = new_message(n); > + if (!pipe_msg) { > + ret = false; > + break; > + } > + pipe_msg->type = VD_AGENT_COMMAND; > + pipe_msg->opaque = VDP_CLIENT_PORT; > + pipe_msg->size = n - sizeof(VDPipeMessage); > + memcpy(pipe_msg->data, (char*)msg + pos, n - > sizeof(VDPipeMessage)); > + enqueue_message(pipe_msg); > + pos += (n - sizeof(VDPipeMessage)); > + } > + MUTEX_UNLOCK(_message_mutex); > + return ret; > } > > bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* > data = NULL) > @@ -847,7 +831,7 @@ bool VDAgent::write_message(uint32_t type, > uint32_t size = 0, void* data = NULL) > VDPipeMessage* pipe_msg; > VDAgentMessage* msg; > > - pipe_msg = (VDPipeMessage*)write_lock(VD_MESSAGE_HEADER_SIZE + > size); > + pipe_msg = new_message(VD_MESSAGE_HEADER_SIZE + size); > if (!pipe_msg) { > return false; > } > @@ -862,10 +846,7 @@ bool VDAgent::write_message(uint32_t type, > uint32_t size = 0, void* data = NULL) > if (size && data) { > memcpy(msg->data, data, size); > } > - write_unlock(VD_MESSAGE_HEADER_SIZE + size); > - if (!_pending_write) { > - write_completion(0, 0, &_pipe_state.write.overlap); > - } > + enqueue_message(pipe_msg); > return true; > } > > @@ -993,6 +974,8 @@ bool > VDAgent::handle_clipboard_grab(VDAgentClipboardGrab* clipboard_grab, > uint32 > // VD_AGENT_CLIPBOARD_NONE and no data, so the client will know the > request failed. > bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* > clipboard_request) > { > + VDAgentMessage* msg; > + uint32_t msg_size; > UINT format; > HANDLE clip_data; > uint8_t* new_data = NULL; > @@ -1008,10 +991,6 @@ bool > VDAgent::handle_clipboard_request(VDAgentClipboardRequest* > clipboard_reques > vd_printf("Unsupported clipboard type %u", > clipboard_request->type); > return false; > } > - if (_out_msg) { > - vd_printf("clipboard change is already pending"); > - return false; > - } > if (!IsClipboardFormatAvailable(format) || > !OpenClipboard(_hwnd)) { > return false; > } > @@ -1047,14 +1026,13 @@ bool > VDAgent::handle_clipboard_request(VDAgentClipboardRequest* > clipboard_reques > CloseClipboard(); > return false; > } > - _out_msg_pos = 0; > - _out_msg_size = sizeof(VDAgentMessage) + > sizeof(VDAgentClipboard) + new_size; > - _out_msg = (VDAgentMessage*)new uint8_t[_out_msg_size]; > - _out_msg->protocol = VD_AGENT_PROTOCOL; > - _out_msg->type = VD_AGENT_CLIPBOARD; > - _out_msg->opaque = 0; > - _out_msg->size = (uint32_t)(sizeof(VDAgentClipboard) + > new_size); > - VDAgentClipboard* clipboard = (VDAgentClipboard*)_out_msg->data; > + msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + > new_size; > + msg = (VDAgentMessage*)new uint8_t[msg_size]; > + msg->protocol = VD_AGENT_PROTOCOL; > + msg->type = VD_AGENT_CLIPBOARD; > + msg->opaque = 0; > + msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size); > + VDAgentClipboard* clipboard = (VDAgentClipboard*)msg->data; > clipboard->type = clipboard_request->type; > > switch (clipboard_request->type) { > @@ -1070,7 +1048,8 @@ bool > VDAgent::handle_clipboard_request(VDAgentClipboardRequest* > clipboard_reques > break; > } > CloseClipboard(); > - write_clipboard(); > + write_clipboard(msg, msg_size); > + delete[] (uint8_t *)msg; > return true; > } > > @@ -1281,8 +1260,8 @@ VOID CALLBACK VDAgent::write_completion(DWORD > err, DWORD bytes, LPOVERLAPPED ove > { > VDAgent* a = _singleton; > VDPipeState* ps = &a->_pipe_state; > + DWORD size_left; > > - a->_pending_write = false; > if (!a->_running) { > return; > } > @@ -1291,40 +1270,57 @@ VOID CALLBACK VDAgent::write_completion(DWORD > err, DWORD bytes, LPOVERLAPPED ove > a->_running = false; > return; > } > - if (!a->write_lock()) { > - a->_running = false; > - return; > - } > + MUTEX_LOCK(a->_write_mutex); > ps->write.start += bytes; > if (ps->write.start == ps->write.end) { > ps->write.start = ps->write.end = 0; > - //DEBUG > - while (a->_out_msg && a->write_clipboard()); > - } else if (WriteFileEx(ps->pipe, ps->write.data + > ps->write.start, > - ps->write.end - ps->write.start, overlap, > write_completion)) { > - a->_pending_write = true; > + } > + > + MUTEX_LOCK(a->_message_mutex); > + size_left = sizeof(a->_pipe_state.write.data) - > a->_pipe_state.write.end; > + while (!a->_message_queue.empty()) { > + VDPipeMessage* msg = a->_message_queue.front(); > + DWORD size = sizeof(VDPipeMessage) + msg->size; > + > + if (size > size_left) { > + break; > + } > + a->_message_queue.pop(); > + memcpy(a->_pipe_state.write.data + a->_pipe_state.write.end, > msg, size); > + a->_pipe_state.write.end += size; > + size_left -= size; > + delete msg; > + } > + MUTEX_UNLOCK(a->_message_mutex); > + > + if (ps->write.start < ps->write.end) { > + if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start, > + ps->write.end - ps->write.start, > overlap, write_completion)) { > + a->_pending_write = true; > + } else { > + vd_printf("WriteFileEx() failed: %lu", GetLastError()); > + a->_running = false; > + } > } else { > - vd_printf("WriteFileEx() failed: %lu", GetLastError()); > - a->_running = false; > + a->_pending_write = false; > } > - a->write_unlock(); > + MUTEX_UNLOCK(a->_write_mutex); > } > > -uint8_t* VDAgent::write_lock(DWORD bytes) > +VDPipeMessage* VDAgent::new_message(DWORD bytes) > { > - MUTEX_LOCK(_write_mutex); > - if (_pipe_state.write.end + bytes <= > sizeof(_pipe_state.write.data)) { > - return &_pipe_state.write.data[_pipe_state.write.end]; > - } else { > - MUTEX_UNLOCK(_write_mutex); > - vd_printf("write buffer is full"); > - return NULL; > - } > + return (VDPipeMessage*)(new char[bytes]); > } > > -void VDAgent::write_unlock(DWORD bytes) > +void VDAgent::enqueue_message(VDPipeMessage* msg) > { > - _pipe_state.write.end += bytes; > + MUTEX_LOCK(_message_mutex); > + _message_queue.push(msg); > + MUTEX_UNLOCK(_message_mutex); > + MUTEX_LOCK(_write_mutex); > + if (!_pending_write) { > + write_completion(0, 0, &_pipe_state.write.overlap); > + } > MUTEX_UNLOCK(_write_mutex); > } > > -- > 1.7.4.1 > > _______________________________________________ > Spice-devel mailing list > Spice-devel@xxxxxxxxxxxxxxxxxxxxx > http://lists.freedesktop.org/mailman/listinfo/spice-devel > _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel