Re: [server PATCH v3] LZ4 compression is now available at the Spicevmc channel

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hey,

On 04/30/2016 12:32 PM, Victor Toso wrote:
Hey,

On Wed, Apr 27, 2016 at 07:03:21PM +0300, Snir Sheriber wrote:
Compressed message type is CompressedData which contains compression
type (1 byte) followed by the uncompressed data size (4 bytes) followed
by the compressed data size (4 bytes) followed by the compressed data

If SPICE_USBREDIR_CAP_DATA_COMPRESS_LZ4 capability is available &&
data_size > COMPRESS_THRESHOLD data will be sent compressed otherwise
data will be sent uncompressed (also if compression has failed)
---
  server/spicevmc.c | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++----
  1 file changed, 119 insertions(+), 9 deletions(-)

diff --git a/server/spicevmc.c b/server/spicevmc.c
index aa6a0ed..cd0140f 100644
--- a/server/spicevmc.c
+++ b/server/spicevmc.c
@@ -34,6 +34,9 @@
  #include "red-channel.h"
  #include "reds.h"
  #include "migration-protocol.h"
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif

  /* todo: add flow control. i.e.,
   * (a) limit the tokens available for the client
@@ -41,10 +44,13 @@
   */
  /* 64K should be enough for all but the largest writes + 32 bytes hdr */
  #define BUF_SIZE (64 * 1024 + 32)
+#define COMPRESS_THRESHOLD 1000

  typedef struct SpiceVmcPipeItem {
      PipeItem base;

+    SpiceDataCompressionType type;
+    uint32_t uncompressed_data_size;
      /* writes which don't fit this will get split, this is not a problem */
      uint8_t buf[BUF_SIZE];
      uint32_t buf_used;
@@ -105,6 +111,17 @@ enum {
      PIPE_ITEM_TYPE_PORT_EVENT,
  };

+static uint8_t *spicevmc_red_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
+                                                       uint16_t type,
+                                                       uint32_t size);
+
+static void spicevmc_red_channel_release_msg_rcv_buf(RedChannelClient *rcc,
+                                                     uint16_t type,
+                                                     uint32_t size,
+                                                     uint8_t *msg);
+
+
+
Two extra spaces here.

  static PipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance *sin,
                                                      void *opaque)
  {
@@ -121,6 +138,7 @@ static PipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance *sin
if (!state->pipe_item) {
          msg_item = spice_new0(SpiceVmcPipeItem, 1);
+        msg_item->type = SPICE_DATA_COMPRESSION_TYPE_NONE;
          pipe_item_init(&msg_item->base, PIPE_ITEM_TYPE_SPICEVMC_DATA);
      } else {
          spice_assert(state->pipe_item->buf_used == 0);
@@ -132,6 +150,37 @@ static PipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance *sin
                    sizeof(msg_item->buf));
      if (n > 0) {
          spice_debug("read from dev %d", n);
+#ifdef USE_LZ4
+        SpiceVmcPipeItem *msg_item_compressed;
+        int bound, compressed_data_count;
+
+        if (n > COMPRESS_THRESHOLD &&
+            red_channel_test_remote_cap(&state->channel,
+					SPICE_SPICEVMC_CAP_DATA_COMPRESS_LZ4) &&
+            ((bound = LZ4_compressBound(n)) != 0)) {
+            if (bound < BUF_SIZE){
+                msg_item_compressed = spice_new0(SpiceVmcPipeItem, 1);
+                pipe_item_init(&msg_item_compressed->base, PIPE_ITEM_TYPE_SPICEVMC_DATA);
+                compressed_data_count = LZ4_compress_default((char*)&msg_item->buf,
+							     (char*)&msg_item_compressed->buf,
+							     n,
+							     bound);
+
+                if (compressed_data_count < 1) {/*LZ4 compression failed-fallback a non-compressed data is to be sent*/
+                    spice_warning("Compress Error");
+                    free(msg_item_compressed);
+                } else {
+                    msg_item_compressed->type = SPICE_DATA_COMPRESSION_TYPE_LZ4;
+                    msg_item_compressed->uncompressed_data_size = n;
+                    msg_item_compressed->buf_used = compressed_data_count;
+                    free(msg_item);
+                    return (PipeItem *)msg_item_compressed;
+                }
+            }
+        }
+#endif
You should do something similar to what you did in spice-gtk with
try_write_compress_LZ4. Code is much more clear that way. Also, consider
checking if host machine = client machine as we probably don't want to
compress data in this situation.


+        msg_item->uncompressed_data_size = 0;
+
          msg_item->buf_used = n;
          return (PipeItem *)msg_item;
      } else {
@@ -278,10 +327,10 @@ static int spicevmc_channel_client_handle_migrate_data(RedChannelClient *rcc,
      return red_char_device_restore(state->chardev, &mig_data->base);
  }
-static int spicevmc_red_channel_client_handle_message(RedChannelClient *rcc,
-                                                      uint16_t type,
+static int spicevmc_red_channel_client_handle_message_parsed(RedChannelClient *rcc,
                                                        uint32_t size,
-                                                      uint8_t *msg)
+                                                      uint16_t type,
+                                                      void *msg)
I don't follow why you are changing the name of the function and the
type of *msg
Because the compressed msg first bytes (compression info) are 1 byte|4 byte|4 byte|...|...|... it must to be parsed (using the generated function which is being called by this function) so it will be extracted correctly (otherwise the alignment is messed up)

  {
      SpiceVmcState *state;
      SpiceCharDeviceInterface *sif;
@@ -296,16 +345,54 @@ static int spicevmc_red_channel_client_handle_message(RedChannelClient *rcc,
          red_char_device_write_buffer_add(state->chardev, state->recv_from_client_buf);
          state->recv_from_client_buf = NULL;
          break;
+    case SPICE_MSGC_SPICEVMC_COMPRESSED_DATA: {
+        /*NOTE: msg free by free() (when cb to spicevmc_red_channel_release_msg_rcv_buf
+        * with the compressed msg type), decompressed is free by the char-device */
+        uint32_t decompressed_size;
+        char* decompressed;
+        SpiceMsgCompressedData *compressed_data_msg = (SpiceMsgCompressedData*)msg;
+
+        decompressed = (char*)spicevmc_red_channel_alloc_msg_rcv_buf(rcc,SPICE_MSGC_SPICEVMC_DATA,
+								     compressed_data_msg->uncompressed_size);
+        switch (compressed_data_msg->type) {
+#ifdef USE_LZ4
+        case SPICE_DATA_COMPRESSION_TYPE_LZ4:
+            decompressed_size = LZ4_decompress_safe ((char*)compressed_data_msg->compressed_data,
+                                                     decompressed,
+                                                     compressed_data_msg->compressed_size,
+                                                     compressed_data_msg->uncompressed_size);
+            break;
+#endif
+        default:
+            spice_warning("Invalid Compression Type");
+            spicevmc_red_channel_release_msg_rcv_buf(rcc, SPICE_MSGC_SPICEVMC_DATA,
+						     compressed_data_msg->uncompressed_size,
+						     (uint8_t*)decompressed);
+            return FALSE;
+        }
+        if (decompressed_size != compressed_data_msg->uncompressed_size) {
+            spice_warning("Decompression Error");
+            spicevmc_red_channel_release_msg_rcv_buf(rcc, SPICE_MSGC_SPICEVMC_DATA,
+						     compressed_data_msg->uncompressed_size,
+						     (uint8_t*)decompressed);
+            return FALSE;
+        }
+        spice_assert(state->recv_from_client_buf->buf == (uint8_t*)decompressed);
+        state->recv_from_client_buf->buf_used = decompressed_size;
+        red_char_device_write_buffer_add(state->chardev, state->recv_from_client_buf);
+        state->recv_from_client_buf = NULL;
+        break;
+    }
I would prefer something similar to try_handle_compressed_msg() on your
spice-gtk patch. That way, all bits related to the decompression are
handled there.

      case SPICE_MSGC_PORT_EVENT:
          if (size != sizeof(uint8_t)) {
              spice_warning("bad port event message size");
              return FALSE;
          }
          if (sif->base.minor_version >= 2 && sif->event != NULL)
-            sif->event(state->chardev_sin, *msg);
+            sif->event(state->chardev_sin, *(uint8_t*)msg);
          break;
      default:
-        return red_channel_client_handle_message(rcc, size, type, msg);
+        return red_channel_client_handle_message(rcc, size, type, (uint8_t*)msg);
      }
Hopefully these casts above won't be needed if we keep msg as uint8_t*

return TRUE;
@@ -371,8 +458,27 @@ static void spicevmc_red_channel_send_data(RedChannelClient *rcc,
  {
      SpiceVmcPipeItem *i = SPICE_CONTAINEROF(item, SpiceVmcPipeItem, base);
- red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_DATA, item);
-    spice_marshaller_add_ref(m, i->buf, i->buf_used);
+    switch (i->type){
+    case SPICE_DATA_COMPRESSION_TYPE_NONE:
+        red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_DATA, item);
+        spice_marshaller_add_ref(m, i->buf, i->buf_used);
+        break;
+    case SPICE_DATA_COMPRESSION_TYPE_LZ4: {
+        SpiceMsgCompressedData compressed_msg;
+
+        red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_COMPRESSED_DATA, item);
+        compressed_msg.type = SPICE_DATA_COMPRESSION_TYPE_LZ4;
+        compressed_msg.uncompressed_size = i->uncompressed_data_size;
+        compressed_msg.compressed_size = i->buf_used;
+
+        spice_marshall_SpiceMsgCompressedData(m, &compressed_msg);
+        spice_marshaller_add_ref(m, i->buf, i->buf_used);
+        break;
+     }
+     default:
+         spice_warning("Invalid Compression Type");
I think g_assert_not_reached() fits here as we should not be sending
wrong message type.

+    }
+
  }
static void spicevmc_red_channel_send_migrate_data(RedChannelClient *rcc,
@@ -518,16 +624,20 @@ RedCharDevice *spicevmc_device_connect(RedsState *reds,
      channel_cbs.handle_migrate_flush_mark = spicevmc_channel_client_handle_migrate_flush_mark;
      channel_cbs.handle_migrate_data = spicevmc_channel_client_handle_migrate_data;
- state = (SpiceVmcState*)red_channel_create(sizeof(SpiceVmcState), reds,
+    state = (SpiceVmcState*)red_channel_create_parser(sizeof(SpiceVmcState), reds,
Why this change is necessary?

                                     reds_get_core_interface(reds), channel_type, id[channel_type]++,
                                     FALSE /* handle_acks */,
-                                   spicevmc_red_channel_client_handle_message,
+                                   spice_get_client_channel_parser(SPICE_CHANNEL_USBREDIR, NULL),
+                                   spicevmc_red_channel_client_handle_message_parsed,
                                     &channel_cbs,
                                     SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER);
      red_channel_init_outgoing_messages_window(&state->channel);

      client_cbs.connect = spicevmc_connect;
      red_channel_register_client_cbs(&state->channel, &client_cbs, NULL);
+#ifdef USE_LZ4
+    red_channel_set_cap(&state->channel, SPICE_SPICEVMC_CAP_DATA_COMPRESS_LZ4);
+#endif

      state->chardev = red_char_device_spicevmc_new(sin, reds, state);
      state->chardev_sin = sin;
Thanks for your patches!

Next time, please send all four patches threaded as it makes easier to
review/apply them. (git send-email *.patch --to=... should do that)

Reviewed-by: Victor Toso <victortoso@xxxxxxxxxx>
Snir.
_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
https://lists.freedesktop.org/mailman/listinfo/spice-devel




[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]     [Monitors]