On Wed, 2018-05-09 at 05:18 -0400, Frediano Ziglio wrote: > > > > On Tue, 2018-05-08 at 10:26 +0100, Frediano Ziglio wrote: > > > The current implementation of server and client assumes that a single > > > data message contains an encoded frame. > > > This is not a problem for most encoding but for MJPEG this causes > > > the client to fail decoding. > > > Collapse frame data into a single message before sending to the client. > > > This is done in the channel as the channel code is responsible to take care > > > of client protocol details. This allows for instance to support chunked > > > transfer to client if implemented. > > > > So the proper fix if we didn't care about compatibility with old/other > > clients would be to implement the "chunked transfer" in the client, > > right? But we don't want to break the compatibility. So it is basically > > a property of the protocol that a frame must be sent in a single > > message. > > > > Yes, there are different assumptions in the code about this, not clear if > this was due by MJPEG encoding (first and only encoding for long time), or > something by design, the documentation on the protocol is basically > nothing (just the "stream_data" message with no description at all). Ok, so perhaps defining this (using the limitations of the current implementation) should be part of the series? > > Why not also send it in a single message from the streaming agent to > > the server, so that the whole message can be forwarded and the messages > > are 1:1? Instead of adding this accumulation code and having to copy > > the frame in the process? > > > > The agent send a single message, but reads/writes to the device are > not atomic. Note that the current protocol introduce additional > delays as the frames cannot be partially decoded but must wait for the > full message (maybe the client can change its read code to handle this, > at the moment it does nothing about, on the server is less of a problem > as the message is build quickly from device to memory so not much delay > is added). This is the part I don't understand... AFAICS, you read the whole message in red-stream-device.c:handle_msg_data(). That should be the whole frame? Then you send the whole frame with stream_channel_send_data(). So it should never be partial? Also see below. > > If I don't understand this properly, please excuse my ignorace... > > > > Cheers, > > Lukas > > > > > Signed-off-by: Frediano Ziglio <fziglio@xxxxxxxxxx> > > > --- > > > server/red-stream-device.c | 10 +++-- > > > server/stream-channel.c | 83 +++++++++++++++++++++++++++++++++----- > > > server/stream-channel.h | 12 ++++++ > > > 3 files changed, 90 insertions(+), 15 deletions(-) > > > > > > diff --git a/server/red-stream-device.c b/server/red-stream-device.c > > > index df6a366f..864be99e 100644 > > > --- a/server/red-stream-device.c > > > +++ b/server/red-stream-device.c > > > @@ -314,11 +314,13 @@ handle_msg_data(StreamDevice *dev, > > > SpiceCharDeviceInstance *sin) > > > if (n <= 0) { > > > break; > > > } > > > - // TODO collect all message ?? > > > - // up: we send a single frame together > > > - // down: guest can cause a crash > > > - stream_channel_send_data(dev->stream_channel, buf, n, > > > reds_get_mm_time()); > > > + uint32_t mm_time = reds_get_mm_time(); > > > + if (dev->msg_pos == 0) { > > > + stream_channel_start_data(dev->stream_channel, dev->hdr.size, > > > mm_time); > > > + } > > > + stream_channel_send_data(dev->stream_channel, buf, n, mm_time); > > > dev->hdr.size -= n; > > > + dev->msg_pos += n; > > > } > > > > > > return dev->hdr.size == 0; > > > diff --git a/server/stream-channel.c b/server/stream-channel.c > > > index fc409ee6..0ac7ba1d 100644 > > > --- a/server/stream-channel.c > > > +++ b/server/stream-channel.c > > > @@ -44,6 +44,7 @@ > > > > > > typedef struct StreamChannelClient StreamChannelClient; > > > typedef struct StreamChannelClientClass StreamChannelClientClass; > > > +typedef struct StreamDataItem StreamDataItem; > > > > > > /* we need to inherit from CommonGraphicsChannelClient > > > * to get buffer handling */ > > > @@ -74,6 +75,10 @@ struct StreamChannel { > > > > > > StreamQueueStat queue_stat; > > > > > > + /* pending partial data item */ > > > + StreamDataItem *data_item; > > > + uint32_t data_item_pos; > > > + > > > /* callback to notify when a stream should be started or stopped */ > > > stream_channel_start_proc start_cb; > > > void *start_opaque; > > > @@ -105,12 +110,12 @@ typedef struct StreamCreateItem { > > > SpiceMsgDisplayStreamCreate stream_create; > > > } StreamCreateItem; > > > > > > -typedef struct StreamDataItem { > > > +struct StreamDataItem { > > > RedPipeItem base; > > > StreamChannel *channel; > > > // NOTE: this must be the last field in the structure > > > SpiceMsgDisplayStreamData data; > > > -} StreamDataItem; > > > +}; > > > > > > #define PRIMARY_SURFACE_ID 0 > > > > > > @@ -130,6 +135,16 @@ stream_channel_client_init(StreamChannelClient > > > *client) > > > client->stream_id = -1; > > > } > > > > > > +static void > > > +stream_channel_unref_data_item(StreamChannel *channel) > > > +{ > > > + if (channel->data_item) { > > > + red_pipe_item_unref(&channel->data_item->base); > > > + channel->data_item = NULL; > > > + channel->data_item_pos = 0; > > > + } > > > +} > > > + > > > static void > > > request_new_stream(StreamChannel *channel, StreamMsgStartStop *start) > > > { > > > @@ -153,6 +168,7 @@ stream_channel_client_on_disconnect(RedChannelClient > > > *rcc) > > > channel->stream_id = -1; > > > channel->width = 0; > > > channel->height = 0; > > > + stream_channel_unref_data_item(channel); > > > > > > // send stream stop to device > > > StreamMsgStartStop stop = { 0, }; > > > @@ -452,6 +468,16 @@ stream_channel_constructed(GObject *object) > > > reds_register_channel(reds, red_channel); > > > } > > > > > > +static void > > > +stream_channel_finalize(GObject *object) > > > +{ > > > + StreamChannel *channel = STREAM_CHANNEL(object); > > > + > > > + stream_channel_unref_data_item(channel); > > > + > > > + G_OBJECT_CLASS(stream_channel_parent_class)->finalize(object); > > > +} > > > + > > > static void > > > stream_channel_class_init(StreamChannelClass *klass) > > > { > > > @@ -459,6 +485,7 @@ stream_channel_class_init(StreamChannelClass *klass) > > > RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass); > > > > > > object_class->constructed = stream_channel_constructed; > > > + object_class->finalize = stream_channel_finalize; > > > > > > channel_class->parser = > > > spice_get_client_channel_parser(SPICE_CHANNEL_DISPLAY, NULL); > > > channel_class->handle_message = handle_message; > > > @@ -532,14 +559,18 @@ data_item_free(RedPipeItem *base) > > > { > > > StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base); > > > > > > - stream_channel_update_queue_stat(pipe_item->channel, -1, > > > -pipe_item->data.data_size); > > > + if (pipe_item->channel->data_item != pipe_item) { > > > + stream_channel_update_queue_stat(pipe_item->channel, -1, > > > -pipe_item->data.data_size); > > > + } > > > > > > g_free(pipe_item); > > > } > > > > > > -static StreamDataItem* > > > -stream_data_item_new(StreamChannel *channel, size_t size, uint32_t > > > mm_time) > > > +static void > > > +stream_channel_init_data_item(StreamChannel *channel, size_t size, > > > uint32_t mm_time) > > > { > > > + stream_channel_unref_data_item(channel); > > > + > > > StreamDataItem *item = g_malloc(sizeof(*item) + size); > > > red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA, > > > data_item_free); > > > @@ -548,7 +579,22 @@ stream_data_item_new(StreamChannel *channel, size_t > > > size, uint32_t mm_time) > > > item->data.data_size = size; > > > item->channel = channel; > > > > > > - return item; > > > + channel->data_item = item; > > > + channel->data_item_pos = 0; > > > +} > > > + > > > +void > > > +stream_channel_start_data(StreamChannel *channel, size_t size, uint32_t > > > mm_time) > > > +{ > > > + // see stream_channel_send_data comment > > > + if (channel->stream_id < 0) { > > > + return; > > > + } > > > + > > > + // TODO this collects all chunks in a single message > > > + // up: we send a single frame together (more compatible) > > > + // down: guest can cause a crash due to DoS. As a safe measure we > > > limit the maximum message > > > + stream_channel_init_data_item(channel, MIN(size, 32*1024*1024), > > > mm_time); > > > } > > > > > > void > > > @@ -563,11 +609,25 @@ stream_channel_send_data(StreamChannel *channel, > > > const void *data, size_t size, > > > > > > RedChannel *red_channel = RED_CHANNEL(channel); > > > > > > - StreamDataItem *item = stream_data_item_new(channel, size, mm_time); > > > - stream_channel_update_queue_stat(channel, 1, size); > > > - // TODO try to optimize avoiding the copy > > > - memcpy(item->data.data, data, size); > > > - red_channel_pipes_add(red_channel, &item->base); > > > + while (size) { > > > + if (channel->data_item == NULL) { > > > + stream_channel_init_data_item(channel, size, mm_time); > > > + } > > > + > > > + StreamDataItem *item = channel->data_item; > > > + > > > + size_t copy_size = item->data.data_size - channel->data_item_pos; > > > + copy_size = MIN(copy_size, size); > > > + // TODO try to optimize avoiding the copy > > > + memcpy(item->data.data + channel->data_item_pos, data, copy_size); > > > + size -= copy_size; > > > + channel->data_item_pos += copy_size; > > > + if (channel->data_item_pos == item->data.data_size) { > > > + channel->data_item = NULL; > > > + stream_channel_update_queue_stat(channel, 1, > > > item->data.data_size); > > > + red_channel_pipes_add(red_channel, &item->base); > > > + } > > > + } What does the while (size) loop do here? It will do more than one iteration only if copy_size < size, which means there is not enough space in the item buffer and in that case it seems to me it will loop forever? Am I missing something? > > > } > > > > > > void > > > @@ -607,6 +667,7 @@ stream_channel_reset(StreamChannel *channel) > > > channel->stream_id = -1; > > > channel->width = 0; > > > channel->height = 0; > > > + stream_channel_unref_data_item(channel); > > > > > > if (!red_channel_is_connected(red_channel)) { > > > return; > > > diff --git a/server/stream-channel.h b/server/stream-channel.h > > > index e8bec80b..18a1bdea 100644 > > > --- a/server/stream-channel.h > > > +++ b/server/stream-channel.h > > > @@ -60,6 +60,18 @@ struct StreamMsgStartStop; > > > > > > void stream_channel_change_format(StreamChannel *channel, > > > const struct StreamMsgFormat *fmt); > > > + > > > +/** > > > + * Tell the channel that a new data packet is starting. > > > + * This can be used to group all chunks together. > > > + */ > > > +void stream_channel_start_data(StreamChannel *channel, > > > + size_t size, > > > + uint32_t mm_time); > > > + > > > +/** > > > + * Send to channel a chunk of data. > > > + */ > > > void stream_channel_send_data(StreamChannel *channel, > > > const void *data, size_t size, > > > uint32_t mm_time); _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel