Re: [PATCH 1/3] vdagent: add message_queue for messages written to pipe

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



> This is only part of the message corruption solution.
> The other part is fixing virtio-serial / spice-qemu-char throttling
> code.
> 
> -replace write_[lock/unlock/completion] calls with
> [new/enqueue]_message
> -remove clipboard specific _out_msg_* class members
> -remove ugly loop - while (a->_out_msg && a->write_clipboard());
> -add _message_mutex for message queue
> -fix pending_write race using _write_mutex
> -TODO: enqueue large message without dividing it to chunks in advance
> 

ACK.

> rhbz #846427
> ---
>  vdagent/vdagent.cpp |  186
>  +++++++++++++++++++++++++--------------------------
>  1 files changed, 91 insertions(+), 95 deletions(-)
> 
> diff --git a/vdagent/vdagent.cpp b/vdagent/vdagent.cpp
> index b8bad44..3ffafe3 100644
> --- a/vdagent/vdagent.cpp
> +++ b/vdagent/vdagent.cpp
> @@ -94,10 +94,10 @@ private:
>      enum { CONTROL_STOP, CONTROL_DESKTOP_SWITCH };
>      void set_control_event(int control_command);
>      void handle_control_event();
> -    uint8_t* write_lock(DWORD bytes = 0);
> -    void write_unlock(DWORD bytes = 0);
> +    VDPipeMessage* new_message(DWORD bytes = 0);
> +    void enqueue_message(VDPipeMessage* msg);
>      bool write_message(uint32_t type, uint32_t size, void* data);
> -    bool write_clipboard();
> +    bool write_clipboard(VDAgentMessage* msg, uint32_t size);
>      bool connect_pipe();
>      bool send_input();
>      void set_display_depth(uint32_t depth);
> @@ -119,9 +119,6 @@ private:
>      HANDLE _clipboard_event;
>      VDAgentMessage* _in_msg;
>      uint32_t _in_msg_pos;
> -    VDAgentMessage* _out_msg;
> -    uint32_t _out_msg_pos;
> -    uint32_t _out_msg_size;
>      bool _pending_input;
>      bool _pending_write;
>      bool _running;
> @@ -131,7 +128,9 @@ private:
>      VDPipeState _pipe_state;
>      mutex_t _write_mutex;
>      mutex_t _control_mutex;
> +    mutex_t _message_mutex;
>      std::queue<int> _control_queue;
> +    std::queue<VDPipeMessage*> _message_queue;
>  
>      bool _logon_desktop;
>      bool _display_setting_initialized;
> @@ -167,9 +166,6 @@ VDAgent::VDAgent()
>      , _clipboard_event (NULL)
>      , _in_msg (NULL)
>      , _in_msg_pos (0)
> -    , _out_msg (NULL)
> -    , _out_msg_pos (0)
> -    , _out_msg_size (0)
>      , _pending_input (false)
>      , _pending_write (false)
>      , _running (false)
> @@ -193,6 +189,7 @@ VDAgent::VDAgent()
>      ZeroMemory(&_pipe_state, sizeof(VDPipeState));
>      MUTEX_INIT(_write_mutex);
>      MUTEX_INIT(_control_mutex);
> +    MUTEX_INIT(_message_mutex);
>  
>      _singleton = this;
>  }
> @@ -538,7 +535,7 @@ bool
> VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config,
> uint32_t port
>      }
>  
>      DWORD msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
> -    reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
> +    reply_pipe_msg = new_message(msg_size);
>      if (!reply_pipe_msg) {
>          return false;
>      }
> @@ -553,10 +550,7 @@ bool
> VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config,
> uint32_t port
>      reply = (VDAgentReply*)reply_msg->data;
>      reply->type = VD_AGENT_MONITORS_CONFIG;
>      reply->error = display_count ? VD_AGENT_SUCCESS :
>      VD_AGENT_ERROR;
> -    write_unlock(msg_size);
> -    if (!_pending_write) {
> -        write_completion(0, 0, &_pipe_state.write.overlap);
> -    }
> +    enqueue_message(reply_pipe_msg);
>      return true;
>  }
>  
> @@ -669,7 +663,7 @@ bool VDAgent::send_announce_capabilities(bool
> request)
>      uint32_t internal_msg_size = sizeof(VDAgentAnnounceCapabilities)
>      + VD_AGENT_CAPS_BYTES;
>  
>      msg_size = VD_MESSAGE_HEADER_SIZE + internal_msg_size;
> -    caps_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
> +    caps_pipe_msg = new_message(msg_size);
>      if (!caps_pipe_msg) {
>          return false;
>      }
> @@ -694,10 +688,7 @@ bool VDAgent::send_announce_capabilities(bool
> request)
>      for (uint32_t i = 0 ; i < caps_size; ++i) {
>          vd_printf("%X", caps->caps[i]);
>      }
> -    write_unlock(msg_size);
> -    if (!_pending_write) {
> -        write_completion(0, 0, &_pipe_state.write.overlap);
> -    }
> +    enqueue_message(caps_pipe_msg);
>      return true;
>  }
>  
> @@ -750,11 +741,10 @@ bool
> VDAgent::handle_display_config(VDAgentDisplayConfig* display_config,
> uint32
>      }
>  
>      msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
> -    reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
> +    reply_pipe_msg = new_message(msg_size);
>      if (!reply_pipe_msg) {
>          return false;
>      }
> -
>      reply_pipe_msg->type = VD_AGENT_COMMAND;
>      reply_pipe_msg->opaque = port;
>      reply_pipe_msg->size = sizeof(VDAgentMessage) +
>      sizeof(VDAgentReply);
> @@ -766,10 +756,7 @@ bool
> VDAgent::handle_display_config(VDAgentDisplayConfig* display_config,
> uint32
>      reply = (VDAgentReply*)reply_msg->data;
>      reply->type = VD_AGENT_DISPLAY_CONFIG;
>      reply->error = VD_AGENT_SUCCESS;
> -    write_unlock(msg_size);
> -    if (!_pending_write) {
> -        write_completion(0, 0, &_pipe_state.write.overlap);
> -    }
> +    enqueue_message(reply_pipe_msg);
>      return true;
>  }
>  
> @@ -778,16 +765,13 @@ bool VDAgent::handle_control(VDPipeMessage*
> msg)
>      switch (msg->type) {
>      case VD_AGENT_RESET: {
>          vd_printf("Agent reset");
> -        VDPipeMessage* ack =
> (VDPipeMessage*)write_lock(sizeof(VDPipeMessage));
> +        VDPipeMessage* ack = new_message(sizeof(VDPipeMessage));
>          if (!ack) {
>              return false;
>          }
>          ack->type = VD_AGENT_RESET_ACK;
>          ack->opaque = msg->opaque;
> -        write_unlock(sizeof(VDPipeMessage));
> -        if (!_pending_write) {
> -            write_completion(0, 0, &_pipe_state.write.overlap);
> -        }
> +        enqueue_message(ack);
>          break;
>      }
>      case VD_AGENT_SESSION_LOGON:
> @@ -816,30 +800,30 @@ bool VDAgent::handle_control(VDPipeMessage*
> msg)
>  
>  //FIXME: division to max size chunks should NOT be here, but in the
>  service
>  //       here we should write the max possible size to the pipe
> -bool VDAgent::write_clipboard()
> +bool VDAgent::write_clipboard(VDAgentMessage* msg, uint32_t size)
>  {
> -    ASSERT(_out_msg);
> -    DWORD n = MIN(sizeof(VDPipeMessage) + _out_msg_size -
> _out_msg_pos, VD_AGENT_MAX_DATA_SIZE);
> -    VDPipeMessage* pipe_msg = (VDPipeMessage*)write_lock(n);
> -    if (!pipe_msg) {
> -        return false;
> -    }
> -    pipe_msg->type = VD_AGENT_COMMAND;
> -    pipe_msg->opaque = VDP_CLIENT_PORT;
> -    pipe_msg->size = n - sizeof(VDPipeMessage);
> -    memcpy(pipe_msg->data, (char*)_out_msg + _out_msg_pos, n -
> sizeof(VDPipeMessage));
> -    write_unlock(n);
> -    if (!_pending_write) {
> -        write_completion(0, 0, &_pipe_state.write.overlap);
> -    }
> -    _out_msg_pos += (n - sizeof(VDPipeMessage));
> -    if (_out_msg_pos == _out_msg_size) {
> -        delete[] (uint8_t *)_out_msg;
> -        _out_msg = NULL;
> -        _out_msg_size = 0;
> -        _out_msg_pos = 0;
> -    }
> -    return true;
> +    uint32_t pos = 0;
> +    bool ret = true;
> +
> +    ASSERT(msg && size);
> +    //FIXME: do it smarter - no loop, no memcopy
> +    MUTEX_LOCK(_message_mutex);
> +    while (pos < size) {
> +        DWORD n = MIN(sizeof(VDPipeMessage) + size - pos,
> VD_AGENT_MAX_DATA_SIZE);
> +        VDPipeMessage* pipe_msg = new_message(n);
> +        if (!pipe_msg) {
> +            ret = false;
> +            break;
> +        }
> +        pipe_msg->type = VD_AGENT_COMMAND;
> +        pipe_msg->opaque = VDP_CLIENT_PORT;
> +        pipe_msg->size = n - sizeof(VDPipeMessage);
> +        memcpy(pipe_msg->data, (char*)msg + pos, n -
> sizeof(VDPipeMessage));
> +        enqueue_message(pipe_msg);
> +        pos += (n - sizeof(VDPipeMessage));
> +    }
> +    MUTEX_UNLOCK(_message_mutex);
> +    return ret;
>  }
>  
>  bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void*
>  data = NULL)
> @@ -847,7 +831,7 @@ bool VDAgent::write_message(uint32_t type,
> uint32_t size = 0, void* data = NULL)
>      VDPipeMessage* pipe_msg;
>      VDAgentMessage* msg;
>  
> -    pipe_msg = (VDPipeMessage*)write_lock(VD_MESSAGE_HEADER_SIZE +
> size);
> +    pipe_msg = new_message(VD_MESSAGE_HEADER_SIZE + size);
>      if (!pipe_msg) {
>          return false;
>      }
> @@ -862,10 +846,7 @@ bool VDAgent::write_message(uint32_t type,
> uint32_t size = 0, void* data = NULL)
>      if (size && data) {
>          memcpy(msg->data, data, size);
>      }
> -    write_unlock(VD_MESSAGE_HEADER_SIZE + size);
> -    if (!_pending_write) {
> -        write_completion(0, 0, &_pipe_state.write.overlap);
> -    }
> +    enqueue_message(pipe_msg);
>      return true;
>  }
>  
> @@ -993,6 +974,8 @@ bool
> VDAgent::handle_clipboard_grab(VDAgentClipboardGrab* clipboard_grab,
> uint32
>  // VD_AGENT_CLIPBOARD_NONE and no data, so the client will know the
>  request failed.
>  bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
>  clipboard_request)
>  {
> +    VDAgentMessage* msg;
> +    uint32_t msg_size;
>      UINT format;
>      HANDLE clip_data;
>      uint8_t* new_data = NULL;
> @@ -1008,10 +991,6 @@ bool
> VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
> clipboard_reques
>          vd_printf("Unsupported clipboard type %u",
>          clipboard_request->type);
>          return false;
>      }
> -    if (_out_msg) {
> -        vd_printf("clipboard change is already pending");
> -        return false;
> -    }
>      if (!IsClipboardFormatAvailable(format) ||
>      !OpenClipboard(_hwnd)) {
>          return false;
>      }
> @@ -1047,14 +1026,13 @@ bool
> VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
> clipboard_reques
>          CloseClipboard();
>          return false;
>      }
> -    _out_msg_pos = 0;
> -    _out_msg_size = sizeof(VDAgentMessage) +
> sizeof(VDAgentClipboard) + new_size;
> -    _out_msg = (VDAgentMessage*)new uint8_t[_out_msg_size];
> -    _out_msg->protocol = VD_AGENT_PROTOCOL;
> -    _out_msg->type = VD_AGENT_CLIPBOARD;
> -    _out_msg->opaque = 0;
> -    _out_msg->size = (uint32_t)(sizeof(VDAgentClipboard) +
> new_size);
> -    VDAgentClipboard* clipboard = (VDAgentClipboard*)_out_msg->data;
> +    msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) +
> new_size;
> +    msg = (VDAgentMessage*)new uint8_t[msg_size];
> +    msg->protocol = VD_AGENT_PROTOCOL;
> +    msg->type = VD_AGENT_CLIPBOARD;
> +    msg->opaque = 0;
> +    msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
> +    VDAgentClipboard* clipboard = (VDAgentClipboard*)msg->data;
>      clipboard->type = clipboard_request->type;
>  
>      switch (clipboard_request->type) {
> @@ -1070,7 +1048,8 @@ bool
> VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
> clipboard_reques
>          break;
>      }
>      CloseClipboard();
> -    write_clipboard();
> +    write_clipboard(msg, msg_size);
> +    delete[] (uint8_t *)msg;
>      return true;
>  }
>  
> @@ -1281,8 +1260,8 @@ VOID CALLBACK VDAgent::write_completion(DWORD
> err, DWORD bytes, LPOVERLAPPED ove
>  {
>      VDAgent* a = _singleton;
>      VDPipeState* ps = &a->_pipe_state;
> +    DWORD size_left;
>  
> -    a->_pending_write = false;
>      if (!a->_running) {
>          return;
>      }
> @@ -1291,40 +1270,57 @@ VOID CALLBACK VDAgent::write_completion(DWORD
> err, DWORD bytes, LPOVERLAPPED ove
>          a->_running = false;
>          return;
>      }
> -    if (!a->write_lock()) {
> -        a->_running = false;
> -        return;
> -    }
> +    MUTEX_LOCK(a->_write_mutex);
>      ps->write.start += bytes;
>      if (ps->write.start == ps->write.end) {
>          ps->write.start = ps->write.end = 0;
> -        //DEBUG
> -        while (a->_out_msg && a->write_clipboard());
> -    } else if (WriteFileEx(ps->pipe, ps->write.data +
> ps->write.start,
> -                           ps->write.end - ps->write.start, overlap,
> write_completion)) {
> -        a->_pending_write = true;
> +    }
> +
> +    MUTEX_LOCK(a->_message_mutex);
> +    size_left = sizeof(a->_pipe_state.write.data) -
> a->_pipe_state.write.end;
> +    while (!a->_message_queue.empty()) {
> +        VDPipeMessage* msg = a->_message_queue.front();
> +        DWORD size = sizeof(VDPipeMessage) + msg->size;
> +
> +        if (size > size_left) {
> +            break;
> +        }
> +        a->_message_queue.pop();
> +        memcpy(a->_pipe_state.write.data + a->_pipe_state.write.end,
> msg, size);
> +        a->_pipe_state.write.end += size;
> +        size_left -= size;
> +        delete msg;
> +    }
> +    MUTEX_UNLOCK(a->_message_mutex);
> +
> +    if (ps->write.start < ps->write.end) {
> +        if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
> +                               ps->write.end - ps->write.start,
> overlap, write_completion)) {
> +            a->_pending_write = true;
> +        } else {
> +            vd_printf("WriteFileEx() failed: %lu", GetLastError());
> +            a->_running = false;
> +        }
>      } else {
> -        vd_printf("WriteFileEx() failed: %lu", GetLastError());
> -        a->_running = false;
> +        a->_pending_write = false;
>      }
> -    a->write_unlock();
> +    MUTEX_UNLOCK(a->_write_mutex);
>  }
>  
> -uint8_t* VDAgent::write_lock(DWORD bytes)
> +VDPipeMessage* VDAgent::new_message(DWORD bytes)
>  {
> -    MUTEX_LOCK(_write_mutex);
> -    if (_pipe_state.write.end + bytes <=
> sizeof(_pipe_state.write.data)) {
> -        return &_pipe_state.write.data[_pipe_state.write.end];
> -    } else {
> -        MUTEX_UNLOCK(_write_mutex);
> -        vd_printf("write buffer is full");
> -        return NULL;
> -    }
> +    return (VDPipeMessage*)(new char[bytes]);
>  }
>  
> -void VDAgent::write_unlock(DWORD bytes)
> +void VDAgent::enqueue_message(VDPipeMessage* msg)
>  {
> -    _pipe_state.write.end += bytes;
> +    MUTEX_LOCK(_message_mutex);
> +    _message_queue.push(msg);
> +    MUTEX_UNLOCK(_message_mutex);
> +    MUTEX_LOCK(_write_mutex);
> +    if (!_pending_write) {
> +        write_completion(0, 0, &_pipe_state.write.overlap);
> +    }
>      MUTEX_UNLOCK(_write_mutex);
>  }
>  
> --
> 1.7.4.1
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@xxxxxxxxxxxxxxxxxxxxx
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
> 
_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
http://lists.freedesktop.org/mailman/listinfo/spice-devel


[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]     [Monitors]