This is only part of the message corruption solution. The other part is fixing virtio-serial / spice-qemu-char throttling code. -replace write_[lock/unlock/completion] calls with [new/enqueue]_message -remove clipboard specific _out_msg_* class members -remove ugly loop - while (a->_out_msg && a->write_clipboard()); -add _message_mutex for message queue -fix pending_write race using _write_mutex -TODO: enqueue large message without dividing it to chunks in advance rhbz #846427 --- vdagent/vdagent.cpp | 186 +++++++++++++++++++++++++-------------------------- 1 files changed, 91 insertions(+), 95 deletions(-) diff --git a/vdagent/vdagent.cpp b/vdagent/vdagent.cpp index b8bad44..3ffafe3 100644 --- a/vdagent/vdagent.cpp +++ b/vdagent/vdagent.cpp @@ -94,10 +94,10 @@ private: enum { CONTROL_STOP, CONTROL_DESKTOP_SWITCH }; void set_control_event(int control_command); void handle_control_event(); - uint8_t* write_lock(DWORD bytes = 0); - void write_unlock(DWORD bytes = 0); + VDPipeMessage* new_message(DWORD bytes = 0); + void enqueue_message(VDPipeMessage* msg); bool write_message(uint32_t type, uint32_t size, void* data); - bool write_clipboard(); + bool write_clipboard(VDAgentMessage* msg, uint32_t size); bool connect_pipe(); bool send_input(); void set_display_depth(uint32_t depth); @@ -119,9 +119,6 @@ private: HANDLE _clipboard_event; VDAgentMessage* _in_msg; uint32_t _in_msg_pos; - VDAgentMessage* _out_msg; - uint32_t _out_msg_pos; - uint32_t _out_msg_size; bool _pending_input; bool _pending_write; bool _running; @@ -131,7 +128,9 @@ private: VDPipeState _pipe_state; mutex_t _write_mutex; mutex_t _control_mutex; + mutex_t _message_mutex; std::queue<int> _control_queue; + std::queue<VDPipeMessage*> _message_queue; bool _logon_desktop; bool _display_setting_initialized; @@ -167,9 +166,6 @@ VDAgent::VDAgent() , _clipboard_event (NULL) , _in_msg (NULL) , _in_msg_pos (0) - , _out_msg (NULL) - , _out_msg_pos (0) - , _out_msg_size (0) , _pending_input (false) , _pending_write (false) , _running (false) @@ -193,6 +189,7 @@ VDAgent::VDAgent() ZeroMemory(&_pipe_state, sizeof(VDPipeState)); MUTEX_INIT(_write_mutex); MUTEX_INIT(_control_mutex); + MUTEX_INIT(_message_mutex); _singleton = this; } @@ -538,7 +535,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port } DWORD msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply); - reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size); + reply_pipe_msg = new_message(msg_size); if (!reply_pipe_msg) { return false; } @@ -553,10 +550,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port reply = (VDAgentReply*)reply_msg->data; reply->type = VD_AGENT_MONITORS_CONFIG; reply->error = display_count ? VD_AGENT_SUCCESS : VD_AGENT_ERROR; - write_unlock(msg_size); - if (!_pending_write) { - write_completion(0, 0, &_pipe_state.write.overlap); - } + enqueue_message(reply_pipe_msg); return true; } @@ -669,7 +663,7 @@ bool VDAgent::send_announce_capabilities(bool request) uint32_t internal_msg_size = sizeof(VDAgentAnnounceCapabilities) + VD_AGENT_CAPS_BYTES; msg_size = VD_MESSAGE_HEADER_SIZE + internal_msg_size; - caps_pipe_msg = (VDPipeMessage*)write_lock(msg_size); + caps_pipe_msg = new_message(msg_size); if (!caps_pipe_msg) { return false; } @@ -694,10 +688,7 @@ bool VDAgent::send_announce_capabilities(bool request) for (uint32_t i = 0 ; i < caps_size; ++i) { vd_printf("%X", caps->caps[i]); } - write_unlock(msg_size); - if (!_pending_write) { - write_completion(0, 0, &_pipe_state.write.overlap); - } + enqueue_message(caps_pipe_msg); return true; } @@ -750,11 +741,10 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32 } msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply); - reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size); + reply_pipe_msg = new_message(msg_size); if (!reply_pipe_msg) { return false; } - reply_pipe_msg->type = VD_AGENT_COMMAND; reply_pipe_msg->opaque = port; reply_pipe_msg->size = sizeof(VDAgentMessage) + sizeof(VDAgentReply); @@ -766,10 +756,7 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32 reply = (VDAgentReply*)reply_msg->data; reply->type = VD_AGENT_DISPLAY_CONFIG; reply->error = VD_AGENT_SUCCESS; - write_unlock(msg_size); - if (!_pending_write) { - write_completion(0, 0, &_pipe_state.write.overlap); - } + enqueue_message(reply_pipe_msg); return true; } @@ -778,16 +765,13 @@ bool VDAgent::handle_control(VDPipeMessage* msg) switch (msg->type) { case VD_AGENT_RESET: { vd_printf("Agent reset"); - VDPipeMessage* ack = (VDPipeMessage*)write_lock(sizeof(VDPipeMessage)); + VDPipeMessage* ack = new_message(sizeof(VDPipeMessage)); if (!ack) { return false; } ack->type = VD_AGENT_RESET_ACK; ack->opaque = msg->opaque; - write_unlock(sizeof(VDPipeMessage)); - if (!_pending_write) { - write_completion(0, 0, &_pipe_state.write.overlap); - } + enqueue_message(ack); break; } case VD_AGENT_SESSION_LOGON: @@ -816,30 +800,30 @@ bool VDAgent::handle_control(VDPipeMessage* msg) //FIXME: division to max size chunks should NOT be here, but in the service // here we should write the max possible size to the pipe -bool VDAgent::write_clipboard() +bool VDAgent::write_clipboard(VDAgentMessage* msg, uint32_t size) { - ASSERT(_out_msg); - DWORD n = MIN(sizeof(VDPipeMessage) + _out_msg_size - _out_msg_pos, VD_AGENT_MAX_DATA_SIZE); - VDPipeMessage* pipe_msg = (VDPipeMessage*)write_lock(n); - if (!pipe_msg) { - return false; - } - pipe_msg->type = VD_AGENT_COMMAND; - pipe_msg->opaque = VDP_CLIENT_PORT; - pipe_msg->size = n - sizeof(VDPipeMessage); - memcpy(pipe_msg->data, (char*)_out_msg + _out_msg_pos, n - sizeof(VDPipeMessage)); - write_unlock(n); - if (!_pending_write) { - write_completion(0, 0, &_pipe_state.write.overlap); - } - _out_msg_pos += (n - sizeof(VDPipeMessage)); - if (_out_msg_pos == _out_msg_size) { - delete[] (uint8_t *)_out_msg; - _out_msg = NULL; - _out_msg_size = 0; - _out_msg_pos = 0; - } - return true; + uint32_t pos = 0; + bool ret = true; + + ASSERT(msg && size); + //FIXME: do it smarter - no loop, no memcopy + MUTEX_LOCK(_message_mutex); + while (pos < size) { + DWORD n = MIN(sizeof(VDPipeMessage) + size - pos, VD_AGENT_MAX_DATA_SIZE); + VDPipeMessage* pipe_msg = new_message(n); + if (!pipe_msg) { + ret = false; + break; + } + pipe_msg->type = VD_AGENT_COMMAND; + pipe_msg->opaque = VDP_CLIENT_PORT; + pipe_msg->size = n - sizeof(VDPipeMessage); + memcpy(pipe_msg->data, (char*)msg + pos, n - sizeof(VDPipeMessage)); + enqueue_message(pipe_msg); + pos += (n - sizeof(VDPipeMessage)); + } + MUTEX_UNLOCK(_message_mutex); + return ret; } bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL) @@ -847,7 +831,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL) VDPipeMessage* pipe_msg; VDAgentMessage* msg; - pipe_msg = (VDPipeMessage*)write_lock(VD_MESSAGE_HEADER_SIZE + size); + pipe_msg = new_message(VD_MESSAGE_HEADER_SIZE + size); if (!pipe_msg) { return false; } @@ -862,10 +846,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL) if (size && data) { memcpy(msg->data, data, size); } - write_unlock(VD_MESSAGE_HEADER_SIZE + size); - if (!_pending_write) { - write_completion(0, 0, &_pipe_state.write.overlap); - } + enqueue_message(pipe_msg); return true; } @@ -993,6 +974,8 @@ bool VDAgent::handle_clipboard_grab(VDAgentClipboardGrab* clipboard_grab, uint32 // VD_AGENT_CLIPBOARD_NONE and no data, so the client will know the request failed. bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_request) { + VDAgentMessage* msg; + uint32_t msg_size; UINT format; HANDLE clip_data; uint8_t* new_data = NULL; @@ -1008,10 +991,6 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques vd_printf("Unsupported clipboard type %u", clipboard_request->type); return false; } - if (_out_msg) { - vd_printf("clipboard change is already pending"); - return false; - } if (!IsClipboardFormatAvailable(format) || !OpenClipboard(_hwnd)) { return false; } @@ -1047,14 +1026,13 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques CloseClipboard(); return false; } - _out_msg_pos = 0; - _out_msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size; - _out_msg = (VDAgentMessage*)new uint8_t[_out_msg_size]; - _out_msg->protocol = VD_AGENT_PROTOCOL; - _out_msg->type = VD_AGENT_CLIPBOARD; - _out_msg->opaque = 0; - _out_msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size); - VDAgentClipboard* clipboard = (VDAgentClipboard*)_out_msg->data; + msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size; + msg = (VDAgentMessage*)new uint8_t[msg_size]; + msg->protocol = VD_AGENT_PROTOCOL; + msg->type = VD_AGENT_CLIPBOARD; + msg->opaque = 0; + msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size); + VDAgentClipboard* clipboard = (VDAgentClipboard*)msg->data; clipboard->type = clipboard_request->type; switch (clipboard_request->type) { @@ -1070,7 +1048,8 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques break; } CloseClipboard(); - write_clipboard(); + write_clipboard(msg, msg_size); + delete[] (uint8_t *)msg; return true; } @@ -1281,8 +1260,8 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove { VDAgent* a = _singleton; VDPipeState* ps = &a->_pipe_state; + DWORD size_left; - a->_pending_write = false; if (!a->_running) { return; } @@ -1291,40 +1270,57 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove a->_running = false; return; } - if (!a->write_lock()) { - a->_running = false; - return; - } + MUTEX_LOCK(a->_write_mutex); ps->write.start += bytes; if (ps->write.start == ps->write.end) { ps->write.start = ps->write.end = 0; - //DEBUG - while (a->_out_msg && a->write_clipboard()); - } else if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start, - ps->write.end - ps->write.start, overlap, write_completion)) { - a->_pending_write = true; + } + + MUTEX_LOCK(a->_message_mutex); + size_left = sizeof(a->_pipe_state.write.data) - a->_pipe_state.write.end; + while (!a->_message_queue.empty()) { + VDPipeMessage* msg = a->_message_queue.front(); + DWORD size = sizeof(VDPipeMessage) + msg->size; + + if (size > size_left) { + break; + } + a->_message_queue.pop(); + memcpy(a->_pipe_state.write.data + a->_pipe_state.write.end, msg, size); + a->_pipe_state.write.end += size; + size_left -= size; + delete msg; + } + MUTEX_UNLOCK(a->_message_mutex); + + if (ps->write.start < ps->write.end) { + if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start, + ps->write.end - ps->write.start, overlap, write_completion)) { + a->_pending_write = true; + } else { + vd_printf("WriteFileEx() failed: %lu", GetLastError()); + a->_running = false; + } } else { - vd_printf("WriteFileEx() failed: %lu", GetLastError()); - a->_running = false; + a->_pending_write = false; } - a->write_unlock(); + MUTEX_UNLOCK(a->_write_mutex); } -uint8_t* VDAgent::write_lock(DWORD bytes) +VDPipeMessage* VDAgent::new_message(DWORD bytes) { - MUTEX_LOCK(_write_mutex); - if (_pipe_state.write.end + bytes <= sizeof(_pipe_state.write.data)) { - return &_pipe_state.write.data[_pipe_state.write.end]; - } else { - MUTEX_UNLOCK(_write_mutex); - vd_printf("write buffer is full"); - return NULL; - } + return (VDPipeMessage*)(new char[bytes]); } -void VDAgent::write_unlock(DWORD bytes) +void VDAgent::enqueue_message(VDPipeMessage* msg) { - _pipe_state.write.end += bytes; + MUTEX_LOCK(_message_mutex); + _message_queue.push(msg); + MUTEX_UNLOCK(_message_mutex); + MUTEX_LOCK(_write_mutex); + if (!_pending_write) { + write_completion(0, 0, &_pipe_state.write.overlap); + } MUTEX_UNLOCK(_write_mutex); } -- 1.7.4.1 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel