[RFC server 2/3] Sending LZ4 compressed msgs over selected channels

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



If channel number is included in the CHANNEL_COMPRESS environment
variable, LZ4 compression will be applied on the channel messages
if possible.

First LZ4 stream compression is trying to be applied, if message
size is too large, regular LZ4 compression is applied, in stream
compression mode messages are being saved sequentially in pre-allocated
buffer which will be utilized by the compression mechanism in the
following compressions
---
 server/red-channel-client.c | 147 ++++++++++++++++++++++++++++++++++++++++++++
 server/red-channel.c        |  35 +++++++++++
 2 files changed, 182 insertions(+)

diff --git a/server/red-channel-client.c b/server/red-channel-client.c
index 600a9f2..8655ced 100644
--- a/server/red-channel-client.c
+++ b/server/red-channel-client.c
@@ -94,6 +94,11 @@ typedef struct OutgoingMessageBuffer {
     int vec_size;
     int pos;
     int size;
+#ifdef USE_LZ4
+    LZ4_stream_t *lz4Stream;
+    char *stream_buf;
+    size_t stream_offset;
+#endif
 } OutgoingMessageBuffer;
 
 typedef struct IncomingMessageBuffer {
@@ -164,6 +169,10 @@ static void red_channel_client_clear_sent_item(RedChannelClient *rcc);
 static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc);
 static void red_channel_client_initable_interface_init(GInitableIface *iface);
 static void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
+#ifdef USE_LZ4
+static int red_channel_client_test_local_common_cap(RedChannelClient *rcc, uint32_t cap);
+#endif
+
 
 /*
  * When an error occurs over a channel, we treat it as a warning
@@ -387,6 +396,12 @@ red_channel_client_finalize(GObject *object)
     }
 
     free(self->priv->incoming.stream_buf);
+
+    if(self->priv->outgoing.lz4Stream) {
+        LZ4_freeStream(self->priv->outgoing.lz4Stream);
+    }
+
+    free(self->priv->outgoing.stream_buf);
 #endif
 
     if (self->priv->send_data.main.marshaller) {
@@ -431,6 +446,12 @@ static void red_channel_client_constructed(GObject *object)
     self->priv->incoming.lz4StreamDecode = LZ4_createStreamDecode();
     self->priv->incoming.stream_buf = spice_malloc(STREAM_BUF_SIZE);
     self->priv->incoming.stream_offset = 0;
+    if (red_channel_client_test_local_common_cap(self, SPICE_COMMON_CAP_LZ4_COMPRESSION) &&
+        reds_stream_get_family(red_channel_client_get_stream(self)) != AF_UNIX) {
+        self->priv->outgoing.lz4Stream = LZ4_createStream();
+        self->priv->outgoing.stream_buf = spice_malloc(STREAM_BUF_SIZE);
+        self->priv->outgoing.stream_offset = 0;
+    }
 #endif
 }
 
@@ -738,6 +759,16 @@ static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc)
     free(rcc->priv->remote_caps.caps);
 }
 
+#ifdef USE_LZ4
+static int red_channel_client_test_local_common_cap(RedChannelClient *rcc, uint32_t cap)
+{
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    const RedChannelCapabilities *caps = red_channel_get_local_capabilities(channel);
+
+    return test_capability(caps->common_caps, caps->num_common_caps, cap);
+}
+#endif
+
 int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap)
 {
     return test_capability(rcc->priv->remote_caps.common_caps,
@@ -1155,6 +1186,119 @@ static void red_channel_client_release_msg_buf(RedChannelClient *rcc,
     klass->release_recv_buf(rcc, type, size, msg);
 }
 
+#ifdef USE_LZ4
+static char* out_stream_get_ptr(OutgoingMessageBuffer *buffer)
+{
+    if(buffer->stream_buf)
+        return &buffer->stream_buf[buffer->stream_offset];
+    else
+        return NULL;
+}
+
+static void out_stream_update(OutgoingMessageBuffer *buffer, int size)
+{
+    /* Add size to the stream buffer offset & reset if needed
+     * so that place for new message is always available */
+    buffer->stream_offset += size;
+    if ((size_t)buffer->stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE)
+        buffer->stream_offset = 0;
+}
+
+/* Getting current message from the marshaller, linearize it, try to
+ * compress it and if compression is successful marshall the compressed
+ * message instead of the original one */
+static int red_channel_client_try_to_compress(RedChannelClient *rcc)
+{
+    uint8_t *data, type;
+    char *compressed_buf, *uncompressed_buf;
+    int  free_data;
+    size_t len;
+    int compressed_data_count;
+    int bound;
+    OutgoingMessageBuffer *buffer = &rcc->priv->outgoing;
+
+    if(buffer->stream_buf == NULL) {
+        /* Allocation failed or capability is disabled */
+        return FALSE;
+    }
+    if(!red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_LZ4_COMPRESSION)) {
+        /* Client doesn't support stream compression at this channel */
+        return FALSE;
+    }
+    if (buffer->size < COMPRESS_STREAM_MIN_THRESHOLD) {
+        /* size < threshold - data will not be compressed */
+        return FALSE;
+    }
+
+    data = spice_marshaller_linearize(rcc->priv->send_data.marshaller, 0, &len, &free_data);
+    bound = LZ4_compressBound(len);
+    compressed_buf = spice_malloc(bound);
+
+    if (buffer->size <= COMPRESS_STREAM_MAX_MSG_SIZE ) {
+        /* Use stream lz4 compression */
+        uncompressed_buf = out_stream_get_ptr(buffer);
+        memcpy(uncompressed_buf, data, len); /* TODO: Avoid that */
+
+        compressed_data_count = LZ4_compress_fast_continue(buffer->lz4Stream,
+                                                           uncompressed_buf,
+                                                           compressed_buf,
+                                                           len,
+                                                           bound,
+                                                           1);
+        type = SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4;
+    } else {
+        /* Use lz4 regular compression */
+        compressed_data_count = LZ4_compress_default((char*)data,
+                                                     compressed_buf,
+                                                     len,
+                                                     bound);
+        type = SPICE_DATA_COMPRESSION_TYPE_LZ4;
+    }
+
+    if (compressed_data_count > 0) {
+        SpiceMsgCompressedData compressed_msg = {
+            .type = type,
+            .uncompressed_size = len
+        };
+
+        if (free_data)
+            g_free(data);
+
+        /* Marshall compressed message FIXME: could be done differently? */
+        rcc->priv->send_data.size = 0;
+        red_channel_client_reset_send_data(rcc);
+        red_channel_client_init_send_data(rcc, SPICE_MSG_COMPRESSED_DATA);
+
+        spice_marshall_SpiceMsgCompressedData(rcc->priv->send_data.marshaller, &compressed_msg);
+        red_channel_client_reset_send_data(rcc);
+        red_channel_client_init_send_data(rcc, SPICE_MSG_COMPRESSED_DATA);
+
+        spice_marshall_SpiceMsgCompressedData(rcc->priv->send_data.marshaller, &compressed_msg);
+        spice_marshaller_add_by_ref_full(rcc->priv->send_data.marshaller,
+                                         (uint8_t*)compressed_buf,
+                                         compressed_data_count,
+                                         (spice_marshaller_item_free_func)free
+                                         ,NULL);
+
+        spice_marshaller_flush(rcc->priv->send_data.marshaller);
+        rcc->priv->send_data.size = spice_marshaller_get_total_size(rcc->priv->send_data.marshaller);
+        rcc->priv->send_data.header.set_msg_size(&rcc->priv->send_data.header,
+                                             rcc->priv->send_data.size -
+                                             rcc->priv->send_data.header.header_size);
+        rcc->priv->send_data.header.data = NULL;
+        buffer->size = red_channel_client_get_out_msg_size(rcc);
+
+        if(type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4) {
+            out_stream_update(buffer, len);
+        }
+        return TRUE;
+    }
+    /* Unsuccessful compression - free & fallback to sending the message uncompressed */
+    g_free(compressed_buf);
+    return FALSE;
+}
+#endif
+
 static void red_channel_client_handle_outgoing(RedChannelClient *rcc)
 {
     RedsStream *stream = rcc->priv->stream;
@@ -1171,6 +1315,9 @@ static void red_channel_client_handle_outgoing(RedChannelClient *rcc)
             return;
         }
     }
+#ifdef USE_LZ4
+    red_channel_client_try_to_compress(rcc); // return value is currently ignored - could use for stat
+#endif
 
     for (;;) {
         buffer->vec_size =
diff --git a/server/red-channel.c b/server/red-channel.c
index 8fe0d33..7122433 100644
--- a/server/red-channel.c
+++ b/server/red-channel.c
@@ -199,6 +199,37 @@ void red_channel_on_output(RedChannel *self, int n)
     stat_inc_counter(self->priv->out_bytes_counter, n);
 }
 
+#ifdef USE_LZ4
+static bool asked_for_compression(RedChannel *channel)
+{
+    /* Check if current channel was included by the user in
+     * the COMPRESS_CHANNELS environment variable. The format
+     * to ask for specific channel compression is as follows:
+     * COMPRESS_CHANNELS=<channel number>,<channel number>,<... */
+    const char* pChannels;
+
+    pChannels = g_getenv("COMPRESS_CHANNELS");
+    if (pChannels != NULL) {
+        uint32_t type;
+        char **result = NULL;
+        int i = 0;
+
+        g_object_get(channel, "channel-type", &type, NULL);
+        result = g_strsplit_set(pChannels,",.:;- ",-1);
+
+        while (result[i] != NULL) {
+            if (type == atoi(result[i])) {
+                g_strfreev(result);
+                return true;
+            }
+            i++;
+        }
+        g_strfreev(result);
+    }
+    return false;
+}
+#endif
+
 static void
 red_channel_constructed(GObject *object)
 {
@@ -209,6 +240,10 @@ red_channel_constructed(GObject *object)
     RedChannelClass *klass = RED_CHANNEL_GET_CLASS(self);
 
     G_OBJECT_CLASS(red_channel_parent_class)->constructed(object);
+#ifdef USE_LZ4
+    if (asked_for_compression(self))
+        red_channel_set_common_cap(self, SPICE_COMMON_CAP_LZ4_COMPRESSION);
+#endif
 
     spice_assert(klass->on_disconnect &&
                  klass->alloc_recv_buf && klass->release_recv_buf);
-- 
2.9.3

_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
https://lists.freedesktop.org/mailman/listinfo/spice-devel




[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]     [Monitors]