> > On Tue, 2018-05-08 at 10:26 +0100, Frediano Ziglio wrote: > > The current implementation of server and client assumes that a single > > data message contains an encoded frame. > > This is not a problem for most encoding but for MJPEG this causes > > the client to fail decoding. > > Collapse frame data into a single message before sending to the client. > > This is done in the channel as the channel code is responsible to take care > > of client protocol details. This allows for instance to support chunked > > transfer to client if implemented. > > So the proper fix if we didn't care about compatibility with old/other > clients would be to implement the "chunked transfer" in the client, > right? But we don't want to break the compatibility. So it is basically > a property of the protocol that a frame must be sent in a single > message. > Yes, there are different assumptions in the code about this, not clear if this was due by MJPEG encoding (first and only encoding for long time), or something by design, the documentation on the protocol is basically nothing (just the "stream_data" message with no description at all). > Why not also send it in a single message from the streaming agent to > the server, so that the whole message can be forwarded and the messages > are 1:1? Instead of adding this accumulation code and having to copy > the frame in the process? > The agent send a single message, but reads/writes to the device are not atomic. Note that the current protocol introduce additional delays as the frames cannot be partially decoded but must wait for the full message (maybe the client can change its read code to handle this, at the moment it does nothing about, on the server is less of a problem as the message is build quickly from device to memory so not much delay is added). > If I don't understand this properly, please excuse my ignorace... > > Cheers, > Lukas > > > Signed-off-by: Frediano Ziglio <fziglio@xxxxxxxxxx> > > --- > > server/red-stream-device.c | 10 +++-- > > server/stream-channel.c | 83 +++++++++++++++++++++++++++++++++----- > > server/stream-channel.h | 12 ++++++ > > 3 files changed, 90 insertions(+), 15 deletions(-) > > > > diff --git a/server/red-stream-device.c b/server/red-stream-device.c > > index df6a366f..864be99e 100644 > > --- a/server/red-stream-device.c > > +++ b/server/red-stream-device.c > > @@ -314,11 +314,13 @@ handle_msg_data(StreamDevice *dev, > > SpiceCharDeviceInstance *sin) > > if (n <= 0) { > > break; > > } > > - // TODO collect all message ?? > > - // up: we send a single frame together > > - // down: guest can cause a crash > > - stream_channel_send_data(dev->stream_channel, buf, n, > > reds_get_mm_time()); > > + uint32_t mm_time = reds_get_mm_time(); > > + if (dev->msg_pos == 0) { > > + stream_channel_start_data(dev->stream_channel, dev->hdr.size, > > mm_time); > > + } > > + stream_channel_send_data(dev->stream_channel, buf, n, mm_time); > > dev->hdr.size -= n; > > + dev->msg_pos += n; > > } > > > > return dev->hdr.size == 0; > > diff --git a/server/stream-channel.c b/server/stream-channel.c > > index fc409ee6..0ac7ba1d 100644 > > --- a/server/stream-channel.c > > +++ b/server/stream-channel.c > > @@ -44,6 +44,7 @@ > > > > typedef struct StreamChannelClient StreamChannelClient; > > typedef struct StreamChannelClientClass StreamChannelClientClass; > > +typedef struct StreamDataItem StreamDataItem; > > > > /* we need to inherit from CommonGraphicsChannelClient > > * to get buffer handling */ > > @@ -74,6 +75,10 @@ struct StreamChannel { > > > > StreamQueueStat queue_stat; > > > > + /* pending partial data item */ > > + StreamDataItem *data_item; > > + uint32_t data_item_pos; > > + > > /* callback to notify when a stream should be started or stopped */ > > stream_channel_start_proc start_cb; > > void *start_opaque; > > @@ -105,12 +110,12 @@ typedef struct StreamCreateItem { > > SpiceMsgDisplayStreamCreate stream_create; > > } StreamCreateItem; > > > > -typedef struct StreamDataItem { > > +struct StreamDataItem { > > RedPipeItem base; > > StreamChannel *channel; > > // NOTE: this must be the last field in the structure > > SpiceMsgDisplayStreamData data; > > -} StreamDataItem; > > +}; > > > > #define PRIMARY_SURFACE_ID 0 > > > > @@ -130,6 +135,16 @@ stream_channel_client_init(StreamChannelClient > > *client) > > client->stream_id = -1; > > } > > > > +static void > > +stream_channel_unref_data_item(StreamChannel *channel) > > +{ > > + if (channel->data_item) { > > + red_pipe_item_unref(&channel->data_item->base); > > + channel->data_item = NULL; > > + channel->data_item_pos = 0; > > + } > > +} > > + > > static void > > request_new_stream(StreamChannel *channel, StreamMsgStartStop *start) > > { > > @@ -153,6 +168,7 @@ stream_channel_client_on_disconnect(RedChannelClient > > *rcc) > > channel->stream_id = -1; > > channel->width = 0; > > channel->height = 0; > > + stream_channel_unref_data_item(channel); > > > > // send stream stop to device > > StreamMsgStartStop stop = { 0, }; > > @@ -452,6 +468,16 @@ stream_channel_constructed(GObject *object) > > reds_register_channel(reds, red_channel); > > } > > > > +static void > > +stream_channel_finalize(GObject *object) > > +{ > > + StreamChannel *channel = STREAM_CHANNEL(object); > > + > > + stream_channel_unref_data_item(channel); > > + > > + G_OBJECT_CLASS(stream_channel_parent_class)->finalize(object); > > +} > > + > > static void > > stream_channel_class_init(StreamChannelClass *klass) > > { > > @@ -459,6 +485,7 @@ stream_channel_class_init(StreamChannelClass *klass) > > RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass); > > > > object_class->constructed = stream_channel_constructed; > > + object_class->finalize = stream_channel_finalize; > > > > channel_class->parser = > > spice_get_client_channel_parser(SPICE_CHANNEL_DISPLAY, NULL); > > channel_class->handle_message = handle_message; > > @@ -532,14 +559,18 @@ data_item_free(RedPipeItem *base) > > { > > StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base); > > > > - stream_channel_update_queue_stat(pipe_item->channel, -1, > > -pipe_item->data.data_size); > > + if (pipe_item->channel->data_item != pipe_item) { > > + stream_channel_update_queue_stat(pipe_item->channel, -1, > > -pipe_item->data.data_size); > > + } > > > > g_free(pipe_item); > > } > > > > -static StreamDataItem* > > -stream_data_item_new(StreamChannel *channel, size_t size, uint32_t > > mm_time) > > +static void > > +stream_channel_init_data_item(StreamChannel *channel, size_t size, > > uint32_t mm_time) > > { > > + stream_channel_unref_data_item(channel); > > + > > StreamDataItem *item = g_malloc(sizeof(*item) + size); > > red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA, > > data_item_free); > > @@ -548,7 +579,22 @@ stream_data_item_new(StreamChannel *channel, size_t > > size, uint32_t mm_time) > > item->data.data_size = size; > > item->channel = channel; > > > > - return item; > > + channel->data_item = item; > > + channel->data_item_pos = 0; > > +} > > + > > +void > > +stream_channel_start_data(StreamChannel *channel, size_t size, uint32_t > > mm_time) > > +{ > > + // see stream_channel_send_data comment > > + if (channel->stream_id < 0) { > > + return; > > + } > > + > > + // TODO this collects all chunks in a single message > > + // up: we send a single frame together (more compatible) > > + // down: guest can cause a crash due to DoS. As a safe measure we > > limit the maximum message > > + stream_channel_init_data_item(channel, MIN(size, 32*1024*1024), > > mm_time); > > } > > > > void > > @@ -563,11 +609,25 @@ stream_channel_send_data(StreamChannel *channel, > > const void *data, size_t size, > > > > RedChannel *red_channel = RED_CHANNEL(channel); > > > > - StreamDataItem *item = stream_data_item_new(channel, size, mm_time); > > - stream_channel_update_queue_stat(channel, 1, size); > > - // TODO try to optimize avoiding the copy > > - memcpy(item->data.data, data, size); > > - red_channel_pipes_add(red_channel, &item->base); > > + while (size) { > > + if (channel->data_item == NULL) { > > + stream_channel_init_data_item(channel, size, mm_time); > > + } > > + > > + StreamDataItem *item = channel->data_item; > > + > > + size_t copy_size = item->data.data_size - channel->data_item_pos; > > + copy_size = MIN(copy_size, size); > > + // TODO try to optimize avoiding the copy > > + memcpy(item->data.data + channel->data_item_pos, data, copy_size); > > + size -= copy_size; > > + channel->data_item_pos += copy_size; > > + if (channel->data_item_pos == item->data.data_size) { > > + channel->data_item = NULL; > > + stream_channel_update_queue_stat(channel, 1, > > item->data.data_size); > > + red_channel_pipes_add(red_channel, &item->base); > > + } > > + } > > } > > > > void > > @@ -607,6 +667,7 @@ stream_channel_reset(StreamChannel *channel) > > channel->stream_id = -1; > > channel->width = 0; > > channel->height = 0; > > + stream_channel_unref_data_item(channel); > > > > if (!red_channel_is_connected(red_channel)) { > > return; > > diff --git a/server/stream-channel.h b/server/stream-channel.h > > index e8bec80b..18a1bdea 100644 > > --- a/server/stream-channel.h > > +++ b/server/stream-channel.h > > @@ -60,6 +60,18 @@ struct StreamMsgStartStop; > > > > void stream_channel_change_format(StreamChannel *channel, > > const struct StreamMsgFormat *fmt); > > + > > +/** > > + * Tell the channel that a new data packet is starting. > > + * This can be used to group all chunks together. > > + */ > > +void stream_channel_start_data(StreamChannel *channel, > > + size_t size, > > + uint32_t mm_time); > > + > > +/** > > + * Send to channel a chunk of data. > > + */ > > void stream_channel_send_data(StreamChannel *channel, > > const void *data, size_t size, > > uint32_t mm_time); > _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel