Compressed message type is CompressedData which contains compression type (1 byte) followed by the uncompressed data size (4 bytes) followed by the compressed data size (4 bytes) followed by the compressed data If SPICE_USBREDIR_CAP_DATA_COMPRESS_LZ4 capability is available && data_size > COMPRESS_THRESHOLD data will be sent compressed otherwise data will be sent uncompressed (also if compression has failed) --- server/spicevmc.c | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 145 insertions(+), 10 deletions(-) diff --git a/server/spicevmc.c b/server/spicevmc.c index b662d94..be924df 100644 --- a/server/spicevmc.c +++ b/server/spicevmc.c @@ -34,6 +34,9 @@ #include "red-channel.h" #include "reds.h" #include "migration-protocol.h" +#ifdef USE_LZ4 +#include <lz4.h> +#endif /* todo: add flow control. i.e., * (a) limit the tokens available for the client @@ -41,10 +44,13 @@ */ /* 64K should be enough for all but the largest writes + 32 bytes hdr */ #define BUF_SIZE (64 * 1024 + 32) +#define COMPRESS_THRESHOLD 1000 typedef struct RedVmcPipeItem { RedPipeItem base; + SpiceDataCompressionType type; + uint32_t uncompressed_data_size; /* writes which don't fit this will get split, this is not a problem */ uint8_t buf[BUF_SIZE]; uint32_t buf_used; @@ -105,6 +111,55 @@ enum { RED_PIPE_ITEM_TYPE_PORT_EVENT, }; +static uint8_t *spicevmc_red_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, + uint16_t type, + uint32_t size); + +static void spicevmc_red_channel_release_msg_rcv_buf(RedChannelClient *rcc, + uint16_t type, + uint32_t size, + uint8_t *msg); + +static RedVmcPipeItem* try_compress_lz4(SpiceVmcState *state, int n, RedVmcPipeItem *msg_item) { + RedVmcPipeItem *msg_item_compressed; + int bound, compressed_data_count; + + if (reds_stream_get_family(state->rcc->stream) == AF_UNIX) { + /* AF_LOCAL - data will not be compressed */ + return NULL; + } + if (n <= COMPRESS_THRESHOLD) { + /* n <= threshold - data will not be compressed */ + return NULL; + } + if (!red_channel_test_remote_cap(&state->channel, SPICE_SPICEVMC_CAP_DATA_COMPRESS_LZ4)) { + /* Client doesn't has compression cap - data will not be compressed */ + return NULL; + } + bound = LZ4_compressBound(n); + if (bound == 0 || bound >= BUF_SIZE) { + /* bound is invalid - data will not be compressed */ + return NULL; + } + msg_item_compressed = spice_new0(RedVmcPipeItem, 1); + red_pipe_item_init(&msg_item_compressed->base, RED_PIPE_ITEM_TYPE_SPICEVMC_DATA); + compressed_data_count = LZ4_compress_default((char*)&msg_item->buf, + (char*)&msg_item_compressed->buf, + n, + bound); + + if (compressed_data_count > 0) { + msg_item_compressed->type = SPICE_DATA_COMPRESSION_TYPE_LZ4; + msg_item_compressed->uncompressed_data_size = n; + msg_item_compressed->buf_used = compressed_data_count; + free(msg_item); + return msg_item_compressed; + }/* LZ4 compression failed-fallback a non-compressed data is to be sent */ + spice_warning("Compress Error"); + free(msg_item_compressed); + return NULL; +} + static RedPipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance *sin, void *opaque) { @@ -121,6 +176,7 @@ static RedPipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance * if (!state->pipe_item) { msg_item = spice_new0(RedVmcPipeItem, 1); + msg_item->type = SPICE_DATA_COMPRESSION_TYPE_NONE; red_pipe_item_init(&msg_item->base, RED_PIPE_ITEM_TYPE_SPICEVMC_DATA); } else { spice_assert(state->pipe_item->buf_used == 0); @@ -132,6 +188,15 @@ static RedPipeItem *spicevmc_chardev_read_msg_from_dev(SpiceCharDeviceInstance * sizeof(msg_item->buf)); if (n > 0) { spice_debug("read from dev %d", n); +#ifdef USE_LZ4 + RedVmcPipeItem *msg_item_compressed; + + msg_item_compressed = try_compress_lz4(state, n, msg_item); + if (msg_item_compressed != NULL) { + return (RedPipeItem *)msg_item_compressed; + } +#endif + msg_item->uncompressed_data_size = n; msg_item->buf_used = n; return &msg_item->base; } else { @@ -275,11 +340,52 @@ static int spicevmc_channel_client_handle_migrate_data(RedChannelClient *rcc, return red_char_device_restore(state->chardev, &mig_data->base); } -static int spicevmc_red_channel_client_handle_message(RedChannelClient *rcc, - uint16_t type, +static int try_handle_compressed_msg(RedChannelClient *rcc, SpiceMsgCompressedData *compressed_data_msg) { + /*NOTE: *decompressed is free by the char-device */ + SpiceVmcState *state; + int decompressed_size; + char *decompressed; + + state = spicevmc_red_channel_client_get_state(rcc); + decompressed = (char*)spicevmc_red_channel_alloc_msg_rcv_buf(rcc,SPICE_MSGC_SPICEVMC_DATA, + compressed_data_msg->uncompressed_size); + switch (compressed_data_msg->type) { +#ifdef USE_LZ4 + case SPICE_DATA_COMPRESSION_TYPE_LZ4: + decompressed_size = LZ4_decompress_safe ((char*)compressed_data_msg->compressed_data, + decompressed, + compressed_data_msg->compressed_size, + compressed_data_msg->uncompressed_size); + break; +#endif + default: + spice_warning("Invalid Compression Type"); + spicevmc_red_channel_release_msg_rcv_buf(rcc,SPICE_MSGC_SPICEVMC_DATA, + compressed_data_msg->uncompressed_size, + (uint8_t*)decompressed); + return FALSE; + } + if (decompressed_size != compressed_data_msg->uncompressed_size) { + spice_warning("Decompression Error"); + spicevmc_red_channel_release_msg_rcv_buf(rcc, SPICE_MSGC_SPICEVMC_DATA, + compressed_data_msg->uncompressed_size, + (uint8_t*)decompressed); + return FALSE; + } + spice_assert(state->recv_from_client_buf->buf == (uint8_t*)decompressed); + state->recv_from_client_buf->buf_used = decompressed_size; + red_char_device_write_buffer_add(state->chardev, state->recv_from_client_buf); + state->recv_from_client_buf = NULL; + return TRUE; + +} + +static int spicevmc_red_channel_client_handle_message_parsed(RedChannelClient *rcc, uint32_t size, - uint8_t *msg) -{ + uint16_t type, + void *msg) +{ /*NOTE: *msg free by free() (when cb to spicevmc_red_channel_release_msg_rcv_buf + *with the compressed msg type)*/ SpiceVmcState *state; SpiceCharDeviceInterface *sif; @@ -293,16 +399,22 @@ static int spicevmc_red_channel_client_handle_message(RedChannelClient *rcc, red_char_device_write_buffer_add(state->chardev, state->recv_from_client_buf); state->recv_from_client_buf = NULL; break; + case SPICE_MSGC_SPICEVMC_COMPRESSED_DATA: { + if (!try_handle_compressed_msg(rcc, (SpiceMsgCompressedData*)msg)) { + return FALSE; + } + break; + } case SPICE_MSGC_PORT_EVENT: if (size != sizeof(uint8_t)) { spice_warning("bad port event message size"); return FALSE; } if (sif->base.minor_version >= 2 && sif->event != NULL) - sif->event(state->chardev_sin, *msg); + sif->event(state->chardev_sin, *(uint8_t*)msg); break; default: - return red_channel_client_handle_message(rcc, size, type, msg); + return red_channel_client_handle_message(rcc, size, type, (uint8_t*)msg); } return TRUE; @@ -360,8 +472,27 @@ static void spicevmc_red_channel_send_data(RedChannelClient *rcc, { RedVmcPipeItem *i = SPICE_UPCAST(RedVmcPipeItem, item); - red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_DATA, item); - spice_marshaller_add_ref(m, i->buf, i->buf_used); + switch (i->type){ + case SPICE_DATA_COMPRESSION_TYPE_NONE: + red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_DATA, item); + spice_marshaller_add_ref(m, i->buf, i->buf_used); + break; + case SPICE_DATA_COMPRESSION_TYPE_LZ4: { + SpiceMsgCompressedData compressed_msg; + + red_channel_client_init_send_data(rcc, SPICE_MSG_SPICEVMC_COMPRESSED_DATA, item); + compressed_msg.type = SPICE_DATA_COMPRESSION_TYPE_LZ4; + compressed_msg.uncompressed_size = i->uncompressed_data_size; + compressed_msg.compressed_size = i->buf_used; + + spice_marshall_SpiceMsgCompressedData(m, &compressed_msg); + spice_marshaller_add_ref(m, i->buf, i->buf_used); + break; + } + default: + g_assert_not_reached(); + } + } static void spicevmc_red_channel_send_migrate_data(RedChannelClient *rcc, @@ -494,16 +625,20 @@ RedCharDevice *spicevmc_device_connect(RedsState *reds, channel_cbs.handle_migrate_flush_mark = spicevmc_channel_client_handle_migrate_flush_mark; channel_cbs.handle_migrate_data = spicevmc_channel_client_handle_migrate_data; - state = (SpiceVmcState*)red_channel_create(sizeof(SpiceVmcState), reds, + state = (SpiceVmcState*)red_channel_create_parser(sizeof(SpiceVmcState), reds, reds_get_core_interface(reds), channel_type, id[channel_type]++, FALSE /* handle_acks */, - spicevmc_red_channel_client_handle_message, + spice_get_client_channel_parser(SPICE_CHANNEL_USBREDIR, NULL), + spicevmc_red_channel_client_handle_message_parsed, &channel_cbs, SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER); red_channel_init_outgoing_messages_window(&state->channel); client_cbs.connect = spicevmc_connect; red_channel_register_client_cbs(&state->channel, &client_cbs, NULL); +#ifdef USE_LZ4 + red_channel_set_cap(&state->channel, SPICE_SPICEVMC_CAP_DATA_COMPRESS_LZ4); +#endif state->chardev = red_char_device_spicevmc_new(sin, reds, state); state->chardev_sin = sin; -- 2.5.5 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel