If LZ4 lib exists, handle LZ4 compressed & stream compressed messages in any channel In stream compression mode decompressed messages are being saved sequentially in a pre-allocated buffer which will be utilized by the decompression mechanism in the following decompressions Update spice-common --- server/red-channel-client.c | 128 ++++++++++++++++++++++++++++++++++++++++++++ spice-common | 2 +- 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/server/red-channel-client.c b/server/red-channel-client.c index cd4b64e..600a9f2 100644 --- a/server/red-channel-client.c +++ b/server/red-channel-client.c @@ -31,6 +31,9 @@ #ifdef HAVE_LINUX_SOCKIOS_H #include <linux/sockios.h> /* SIOCOUTQ */ #endif +#ifdef USE_LZ4 +#include <lz4.h> +#endif #include <common/generated_server_marshallers.h> #include "red-channel-client.h" @@ -99,6 +102,11 @@ typedef struct IncomingMessageBuffer { uint32_t header_pos; uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. uint32_t msg_pos; +#ifdef USE_LZ4 + LZ4_streamDecode_t *lz4StreamDecode; + char *stream_buf; + size_t stream_offset; +#endif } IncomingMessageBuffer; struct RedChannelClientPrivate @@ -209,6 +217,14 @@ enum ConnectivityState { CONNECTIVITY_STATE_DISCONNECTED, }; +#ifdef USE_LZ4 +enum { + COMPRESS_STREAM_MIN_THRESHOLD = 6, + COMPRESS_STREAM_MAX_MSG_SIZE = 1024 * 64, /* Maximum message size for LZ4 stream compression */ + STREAM_BUF_SIZE = 1024 * 64 + COMPRESS_STREAM_MAX_MSG_SIZE + 8 /* Recommended buffer size according to the api */ +}; +#endif + typedef struct RedEmptyMsgPipeItem { RedPipeItem base; int msg; @@ -365,6 +381,13 @@ red_channel_client_finalize(GObject *object) reds_stream_free(self->priv->stream); self->priv->stream = NULL; +#ifdef USE_LZ4 + if (self->priv->incoming.lz4StreamDecode) { + LZ4_freeStreamDecode(self->priv->incoming.lz4StreamDecode); + } + + free(self->priv->incoming.stream_buf); +#endif if (self->priv->send_data.main.marshaller) { spice_marshaller_destroy(self->priv->send_data.main.marshaller); @@ -404,6 +427,11 @@ static void red_channel_client_constructed(GObject *object) self->priv->is_mini_header = FALSE; } self->priv->incoming.header.data = self->priv->incoming.header_buf; +#ifdef USE_LZ4 + self->priv->incoming.lz4StreamDecode = LZ4_createStreamDecode(); + self->priv->incoming.stream_buf = spice_malloc(STREAM_BUF_SIZE); + self->priv->incoming.stream_offset = 0; +#endif } static void red_channel_client_class_init(RedChannelClientClass *klass) @@ -1233,6 +1261,94 @@ static uint8_t *red_channel_client_parse(RedChannelClient *rcc, uint8_t *message return parsed_message; } +#ifdef USE_LZ4 +static char* in_stream_get_ptr(IncomingMessageBuffer *buffer) +{ + if (buffer->stream_buf) { + return &buffer->stream_buf[buffer->stream_offset]; + } else { + return NULL; + } +} + +static void in_stream_update(IncomingMessageBuffer *buffer, int size) +{ + /* Add size to the stream buffer offset & reset if needed so that + * place for new message is always available */ + buffer->stream_offset += size; + if (buffer->stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE) { + buffer->stream_offset = 0; + } +} + +#endif + +static int red_channel_client_handle_compressed_msg(RedChannelClient *rcc, + SpiceMsgCompressedData *compressed_data_msg) +{ + IncomingMessageBuffer *buffer = &rcc->priv->incoming; + int decompressed_size; + uint32_t msg_size; + uint16_t msg_type; + char *decompressed = NULL; + + switch (compressed_data_msg->type) { +#ifdef USE_LZ4 + case SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4: { + if (!(decompressed = in_stream_get_ptr(buffer))) { + return FALSE; + } + decompressed_size = LZ4_decompress_safe_continue(buffer->lz4StreamDecode, + (char *)compressed_data_msg->compressed_data, + decompressed, + compressed_data_msg->compressed_size, + compressed_data_msg->uncompressed_size); + break; + } + case SPICE_DATA_COMPRESSION_TYPE_LZ4: { + if (!(decompressed = spice_malloc(compressed_data_msg->uncompressed_size))) { + return FALSE; + } + decompressed_size = LZ4_decompress_safe((char *)compressed_data_msg->compressed_data, + decompressed, + compressed_data_msg->compressed_size, + compressed_data_msg->uncompressed_size); + break; + } +#endif + default: + spice_warning("Invalid Compression Type"); + return FALSE; + } + int ret; + if (decompressed_size > 0 && decompressed_size == compressed_data_msg->uncompressed_size) { + /* Lay out decompressed message into incoming buffer */ + msg_size = buffer->header.get_msg_size(&buffer->header); + red_channel_client_release_msg_buf(rcc, SPICE_MSGC_COMPRESSED_DATA, msg_size, buffer->msg); + /* copy decompressed msg header to incoming */ + memcpy(buffer->header.data, decompressed, buffer->header.header_size); + msg_size = buffer->header.get_msg_size(&buffer->header); + msg_type = buffer->header.get_msg_type(&buffer->header); + /* copy decompressed msg data to incoming buffer */ + buffer->msg = red_channel_client_alloc_msg_buf(rcc, msg_type, msg_size); + memcpy(buffer->msg, decompressed + buffer->header.header_size, msg_size); /* TODO: Avoid that */ +#ifdef USE_LZ4 + if (compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4) { + in_stream_update(buffer, decompressed_size); + } +#endif + ret = TRUE; + } else { + spice_warning("Decompression Failed"); + ret = FALSE; + } + + if (compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_LZ4) { + free(decompressed); + } + return ret; +} + // TODO: this implementation, as opposed to the old implementation in red_worker, // does many calls to red_peer_receive and through it cb_read, and thus avoids pointer // arithmetic for the case where a single cb_read could return multiple messages. But @@ -1304,6 +1420,7 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc) } } +try_parse: parsed = red_channel_client_parse(rcc, buffer->msg, msg_size, msg_type, @@ -1317,6 +1434,17 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc) red_channel_client_disconnect(rcc); return; } + if (msg_type == SPICE_MSGC_COMPRESSED_DATA) { + if (red_channel_client_handle_compressed_msg(rcc, (SpiceMsgCompressedData*)parsed)) { + msg_size = buffer->header.get_msg_size(&buffer->header); + msg_type = buffer->header.get_msg_type(&buffer->header); + if (parsed_free != NULL) + parsed_free(parsed); + goto try_parse; /* parse decopressed msg now */ + } else { + spice_warning("Decompression failed"); + } + } ret_handle = klass->handle_message(rcc, msg_type, parsed_size, parsed); if (parsed_free != NULL) { diff --git a/spice-common b/spice-common index 6439bec..cde4a8b 160000 --- a/spice-common +++ b/spice-common @@ -1 +1 @@ -Subproject commit 6439bec0deed0fcf251d4d77514b7c4a87384097 +Subproject commit cde4a8bb67330006ee26e1c0d5892c824edce327 -- 2.9.3 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel