If LZ4 lib exists, handle LZ4 compressed & stream compressed messages in any channel In stream compression mode decompressed messages are being saved sequentially in a pre-allocated buffer which will be utilized by the decompression mechanism in the following decompressions Update spice-common --- spice-common | 2 +- src/spice-channel-priv.h | 12 ++++ src/spice-channel.c | 150 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 1 deletion(-) diff --git a/spice-common b/spice-common index 30e8237..cde4a8b 160000 --- a/spice-common +++ b/spice-common @@ -1 +1 @@ -Subproject commit 30e8237934b4060513bd0bdcc5ea0caa1dcc7b59 +Subproject commit cde4a8bb67330006ee26e1c0d5892c824edce327 diff --git a/src/spice-channel-priv.h b/src/spice-channel-priv.h index 50aca5c..2eaf816 100644 --- a/src/spice-channel-priv.h +++ b/src/spice-channel-priv.h @@ -27,6 +27,10 @@ #include <sasl/sasl.h> #endif +#ifdef USE_LZ4 +#include<lz4.h> +#endif + #include "spice-channel.h" #include "spice-util-priv.h" #include "coroutine.h" @@ -62,6 +66,8 @@ struct _SpiceMsgIn { size_t psize; message_destructor_t pfree; SpiceMsgIn *parent; + bool stream_buf_in_use; + uint8_t header_offset; }; enum spice_channel_state { @@ -100,6 +106,12 @@ struct _SpiceChannelPrivate { uint64_t out_serial; uint64_t in_serial; +#ifdef USE_LZ4 + LZ4_streamDecode_t *lz4_in_stream; + char *in_stream_buf; + size_t in_stream_offset; +#endif + /* not swapped */ SpiceSession *session; GCoroutine coroutine; diff --git a/src/spice-channel.c b/src/spice-channel.c index af67931..5f07bd6 100644 --- a/src/spice-channel.c +++ b/src/spice-channel.c @@ -47,6 +47,14 @@ #include "gio-coroutine.h" +#ifdef USE_LZ4 +enum { + COMPRESS_STREAM_MIN_THRESHOLD = 6, + COMPRESS_STREAM_MAX_MSG_SIZE = 1024 * 64, /* Maximum message size for LZ4 stream compression */ + STREAM_BUF_SIZE = 1024 *64 + COMPRESS_STREAM_MAX_MSG_SIZE + 8 /* Recommended buffer size according to the api */ +}; +#endif + static void spice_channel_handle_msg(SpiceChannel *channel, SpiceMsgIn *msg); static void spice_channel_write_msg(SpiceChannel *channel, SpiceMsgOut *out); static void spice_channel_send_link(SpiceChannel *channel); @@ -149,6 +157,11 @@ static void spice_channel_constructed(GObject *gobject) if (disabled && strstr(disabled, desc)) c->disable_channel_msg = TRUE; +#ifdef USE_LZ4 + c->lz4_in_stream = LZ4_createStreamDecode(); + c->in_stream_buf = g_malloc(STREAM_BUF_SIZE); + c->in_stream_offset = 0; +#endif spice_session_channel_new(c->session, channel); /* Chain up to the parent class */ @@ -185,6 +198,13 @@ static void spice_channel_finalize(GObject *gobject) g_mutex_clear(&c->xmit_queue_lock); +#ifdef USE_LZ4 + if(c->lz4_in_stream) + LZ4_freeStreamDecode(c->lz4_in_stream); + + g_free(c->in_stream_buf); +#endif + if (c->caps) g_array_free(c->caps, TRUE); @@ -504,6 +524,10 @@ SpiceMsgIn *spice_msg_in_new(SpiceChannel *channel) in = g_new0(SpiceMsgIn, 1); in->refcount = 1; in->channel = channel; +#ifdef USE_LZ4 + in->stream_buf_in_use = FALSE; + in->header_offset = 0; +#endif return in; } @@ -546,7 +570,15 @@ void spice_msg_in_unref(SpiceMsgIn *in) in->pfree(in->parsed); if (in->parent) { spice_msg_in_unref(in->parent); +#ifdef USE_LZ4 + } else if(in->header_offset > 0) { + /* points to the allocated buffer in header_offset offset */ + g_free(in->data - in->header_offset); + } else if(!in->stream_buf_in_use) { + /* points to the stream buffer */ +#else } else { +#endif g_free(in->data); } g_free(in); @@ -1939,6 +1971,119 @@ gboolean spice_channel_get_read_only(SpiceChannel *channel) return spice_session_get_read_only(channel->priv->session); } +#ifdef USE_LZ4 +static char* in_stream_get_ptr(SpiceChannel *channel) +{ + SpiceChannelPrivate *c; + + c = SPICE_CHANNEL(channel)->priv; + + return &c->in_stream_buf[c->in_stream_offset]; + +} + +static void in_stream_update(SpiceChannel *channel, int size) +{ + SpiceChannelPrivate *c; + + c = SPICE_CHANNEL(channel)->priv; + /* Add size to the stream buffer offset & reset if needed + * so that place for new message is always available */ + c->in_stream_offset += size; + if (c->in_stream_offset >= STREAM_BUF_SIZE - COMPRESS_STREAM_MAX_MSG_SIZE) + c->in_stream_offset = 0; +} + +static LZ4_streamDecode_t* in_stream_get_stream(SpiceChannel *channel) +{ + SpiceChannelPrivate *c; + + c = SPICE_CHANNEL(channel)->priv; + + return c->lz4_in_stream; +} +#endif + +static int spice_channel_recv_compressed_msg(SpiceChannel *channel, + SpiceMsgIn *in, + int *msg_size, + int *msg_type) +{ + SpiceChannelPrivate *c = channel->priv; + int decompressed_size = 0; + char *decompressed; + uint32_t uncompressed_size; + SpiceMsgCompressedData *compressed_data_msg = (SpiceMsgCompressedData*)c->parser(in->data, + in->data + *msg_size, + *msg_type, + c->peer_hdr.minor_version, + &in->psize, + &in->pfree); + + if (compressed_data_msg == NULL) { + g_critical("failed to parse decompressed message: %s type %d", + c->name, *msg_type); + return FALSE; + } + if (compressed_data_msg->uncompressed_size == 0) { + spice_warning("Invalid uncompressed_size"); + return FALSE; + } + + uncompressed_size = compressed_data_msg->uncompressed_size; + switch (compressed_data_msg->type) { +#ifdef USE_LZ4 + case SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4: + decompressed = in_stream_get_ptr(channel); + decompressed_size = LZ4_decompress_safe_continue (in_stream_get_stream(channel), + (char*)compressed_data_msg->compressed_data, + decompressed, + compressed_data_msg->compressed_size, + compressed_data_msg->uncompressed_size); + break; + case SPICE_DATA_COMPRESSION_TYPE_LZ4: + decompressed = g_malloc(compressed_data_msg->uncompressed_size); + decompressed_size = LZ4_decompress_safe((char*)compressed_data_msg->compressed_data, + decompressed, + compressed_data_msg->compressed_size, + compressed_data_msg->uncompressed_size); + break; +#endif + default: + spice_warning("Unknown Compression Type"); + return FALSE; + } + if (decompressed_size != uncompressed_size) { + spice_warning("Decompress Error decompressed_size=%d expected=%u", + decompressed_size, uncompressed_size); + return FALSE; + } + + if(decompressed_size > 0) { /* Decompression success */ + free(in->data); + /* point in->data the data location (after the header) */ + in->data = (uint8_t*)decompressed + spice_header_get_header_size(c->use_mini_header); +#ifdef USE_LZ4 + if(compressed_data_msg->type == SPICE_DATA_COMPRESSION_TYPE_STREAM_LZ4) { + in->stream_buf_in_use = TRUE; + in_stream_update(channel, decompressed_size); + } else { /* SPICE_DATA_COMPRESSION_TYPE_LZ4 */ + in->header_offset = spice_header_get_header_size(c->use_mini_header); + } +#endif + if(in->pfree != NULL) + in->pfree((uint8_t*)compressed_data_msg); + /* copy new header and update variables */ + memcpy(in->header,decompressed, spice_header_get_header_size(c->use_mini_header)); + in->dpos = *msg_size = spice_header_get_msg_size(in->header, c->use_mini_header); + *msg_type = spice_header_get_msg_type(in->header, c->use_mini_header); + return TRUE; + } else { /* Decompression fail */ + spice_warning("Decompression Failed"); + return FALSE; + } +} + /* coroutine context */ G_GNUC_INTERNAL void spice_channel_recv_msg(SpiceChannel *channel, @@ -1969,6 +2114,11 @@ void spice_channel_recv_msg(SpiceChannel *channel, in->dpos = msg_size; msg_type = spice_header_get_msg_type(in->header, c->use_mini_header); + if (msg_type == SPICE_MSG_COMPRESSED_DATA) { + if(!spice_channel_recv_compressed_msg(channel, in, &msg_size, &msg_type)) + goto end; + } + sub_list_offset = spice_header_get_msg_sub_list(in->header, c->use_mini_header); if (msg_type == SPICE_MSG_LIST || sub_list_offset) { -- 2.9.3 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel