On Tue, 2017-08-22 at 11:26 -0400, Frediano Ziglio wrote: > > > > So I was reviewing this code and had a few comments but then I > > realized > > that we already have a basic flow control mechanism for char > > devices. > > RedCharDeviceClient has an implementation based on 'tokens'. Is > > there > > any reason that we can't use this instead of re-implementing it > > here? > > > > I think the main reason is that the token implementation requires > to use the characters device as a pass-through device for the client > while this device is not a pass-through. Yes, it's not a pass-through, but when I looked, I had thought that it would be possible to use the RedCharDeviceClient for our purpose. It's not explicitly a pass-through: the data can be transformed (or dropped) by the implementation of RedCharDeviceClass::read_one_msg_from_device(). But now I see that some of our char device reads may result in many RedPipeItem being sent (e.g. reading a STREAM_TYPE_FORMAT message results in us sending the following pipe items over the stream channel: STREAM_DESTROY, SURFACE_DESTROY, SURFACE_CREATE, DISPLAY_MARK, STREAM_CREATE, and STREAM_ACTIVATE_REPORT)... > > This implementation does not surely solve the entire problem. > The streaming in the DisplayChannel protocol is supposed to use > stream reports on the server <-> client chat and this should be > propagated to the guest trying to reduce the bandwidth usage > (reducing frames and/or quality). Currently I enable these > report but there's no handling of it. Also there's no messages > for the guest for these information. > > Basically this patch solve one part of the issue. It avoids > the queue guest <-> server to grow undefinitely. OK. Even so, it seems a bit of a shame to have to sort of re-implement flow control here. Maybe there's no choice... > > Frediano > > > > > > > On Wed, 2017-06-14 at 16:40 +0100, Frediano Ziglio wrote: > > > Do not allow the guest to fill host memory. > > > Also having a huge queue mainly cause to have a higher video > > > latency. > > > > > > Signed-off-by: Frediano Ziglio <fziglio@xxxxxxxxxx> > > > --- > > > server/stream-channel.c | 41 > > > ++++++++++++++++++++++++++++++++++++++++- > > > server/stream-channel.h | 10 ++++++++++ > > > server/stream-device.c | 34 +++++++++++++++++++++++++++++++++- > > > 3 files changed, 83 insertions(+), 2 deletions(-) > > > > > > diff --git a/server/stream-channel.c b/server/stream-channel.c > > > index 58c550e..966dd77 100644 > > > --- a/server/stream-channel.c > > > +++ b/server/stream-channel.c > > > @@ -68,9 +68,15 @@ struct StreamChannel { > > > /* size of the current video stream */ > > > unsigned width, height; > > > > > > + StreamQueueStat queue_stat; > > > + > > > /* callback to notify when a stream should be started or > > > stopped > > > */ > > > stream_channel_start_proc start_cb; > > > void *start_opaque; > > > + > > > + /* callback to notify when queue statistics changes */ > > > + stream_channel_queue_stat_proc queue_cb; > > > + void *queue_opaque; > > > }; > > > > > > struct StreamChannelClass { > > > @@ -95,6 +101,7 @@ typedef struct StreamCreateItem { > > > > > > typedef struct StreamDataItem { > > > RedPipeItem base; > > > + StreamChannel *channel; > > > // NOTE: this must be the last field in the structure > > > SpiceMsgDisplayStreamData data; > > > } StreamDataItem; > > > @@ -450,6 +457,27 @@ stream_channel_change_format(StreamChannel > > > *channel, const StreamMsgFormat *fmt) > > > red_pipe_item_unref(&item->base); > > > } > > > > > > +static inline void > > > +stream_channel_update_queue_stat(StreamChannel *channel, > > > + int32_t num_diff, int32_t > > > size_diff) > > > +{ > > > + channel->queue_stat.num_items += num_diff; > > > + channel->queue_stat.size += size_diff; > > > + if (channel->queue_cb) { > > > + channel->queue_cb(channel->queue_opaque, &channel- > > > > queue_stat, channel); > > > > > > + } > > > +} > > > + > > > +static void > > > +data_item_free(RedPipeItem *base) > > > +{ > > > + StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, > > > base); > > > + > > > + stream_channel_update_queue_stat(pipe_item->channel, -1, > > > -pipe_item->data.data_size); > > > + > > > + free(pipe_item); > > > +} > > > + > > > void > > > stream_channel_send_data(StreamChannel *channel, const void > > > *data, > > > size_t size) > > > { > > > @@ -460,10 +488,13 @@ stream_channel_send_data(StreamChannel > > > *channel, const void *data, size_t size) > > > RedChannel *red_channel = RED_CHANNEL(channel); > > > > > > StreamDataItem *item = spice_malloc(sizeof(*item) + size); > > > - red_pipe_item_init(&item->base, > > > RED_PIPE_ITEM_TYPE_STREAM_DATA); > > > + red_pipe_item_init_full(&item->base, > > > RED_PIPE_ITEM_TYPE_STREAM_DATA, > > > + data_item_free); > > > item->data.base.id = channel->stream_id; > > > item->data.base.multi_media_time = reds_get_mm_time(); > > > item->data.data_size = size; > > > + item->channel = channel; > > > + stream_channel_update_queue_stat(channel, 1, size); > > > // TODO try to optimize avoiding the copy > > > memcpy(item->data.data, data, size); > > > red_channel_pipes_new_add(red_channel, pipe_item_new_ref, > > > item); > > > @@ -479,6 +510,14 @@ > > > stream_channel_register_start_cb(StreamChannel > > > *channel, > > > } > > > > > > void > > > +stream_channel_register_queue_stat_cb(StreamChannel *channel, > > > + stream_channel_queue_stat_ > > > proc > > > cb, void *opaque) > > > +{ > > > + channel->queue_cb = cb; > > > + channel->queue_opaque = opaque; > > > +} > > > + > > > +void > > > stream_channel_reset(StreamChannel *channel) > > > { > > > RedChannel *red_channel = RED_CHANNEL(channel); > > > diff --git a/server/stream-channel.h b/server/stream-channel.h > > > index 8c9c0f1..ca20b6a 100644 > > > --- a/server/stream-channel.h > > > +++ b/server/stream-channel.h > > > @@ -65,6 +65,16 @@ typedef void (*stream_channel_start_proc)(void > > > *opaque, struct StreamMsgStartSto > > > void stream_channel_register_start_cb(StreamChannel *channel, > > > stream_channel_start_proc > > > cb, > > > void *opaque); > > > > > > +typedef struct StreamQueueStat { > > > + uint32_t num_items; > > > + uint32_t size; > > > +} StreamQueueStat; > > > + > > > +typedef void (*stream_channel_queue_stat_proc)(void *opaque, > > > const > > > StreamQueueStat *stats, > > > + StreamChannel > > > *channel); > > > +void stream_channel_register_queue_stat_cb(StreamChannel > > > *channel, > > > + stream_channel_queue_ > > > stat > > > _proc cb, void *opaque); > > > + > > > G_END_DECLS > > > > > > #endif /* STREAM_CHANNEL_H_ */ > > > diff --git a/server/stream-device.c b/server/stream-device.c > > > index b8a9dac..2b1e2f2 100644 > > > --- a/server/stream-device.c > > > +++ b/server/stream-device.c > > > @@ -44,6 +44,7 @@ struct StreamDevice { > > > uint8_t hdr_pos; > > > bool has_error; > > > bool opened; > > > + bool flow_stopped; > > > StreamChannel *channel; > > > }; > > > > > > @@ -67,7 +68,7 @@ stream_device_read_msg_from_dev(RedCharDevice > > > *self, SpiceCharDeviceInstance *si > > > SpiceCharDeviceInterface *sif; > > > int n; > > > > > > - if (dev->has_error) { > > > + if (dev->has_error || dev->flow_stopped) { > > > return NULL; > > > } > > > > > > @@ -165,6 +166,9 @@ handle_msg_data(StreamDevice *dev, > > > SpiceCharDeviceInstance *sin) > > > if (n <= 0) { > > > break; > > > } > > > + // TODO collect all message ?? > > > + // up: we send a single frame together > > > + // down: guest can cause a crash > > > stream_channel_send_data(dev->channel, buf, n); > > > dev->hdr.size -= n; > > > } > > > @@ -218,6 +222,33 @@ stream_device_stream_start(void *opaque, > > > StreamMsgStartStop *start, > > > red_char_device_write_buffer_add(char_dev, buf); > > > } > > > > > > +static void > > > +stream_device_stream_queue_stat(void *opaque, const > > > StreamQueueStat > > > *stats G_GNUC_UNUSED, > > > + StreamChannel *channel > > > G_GNUC_UNUSED) > > > +{ > > > + StreamDevice *dev = (StreamDevice *) opaque; > > > + > > > + if (!dev->opened) { > > > + return; > > > + } > > > + > > > + // very easy control flow... if any data stop > > > + // this seems a very small queue but as we use tcp > > > + // there's already that queue > > > + if (stats->num_items) { > > > + dev->flow_stopped = true; > > > + return; > > > + } > > > + > > > + if (dev->flow_stopped) { > > > + dev->flow_stopped = false; > > > + // TODO resume flow... > > > + // avoid recursion if we need to call get data from data > > > handling from > > > + // data handling > > > + red_char_device_wakeup(&dev->parent); > > > + } > > > +} > > > + > > > RedCharDevice * > > > stream_device_connect(RedsState *reds, SpiceCharDeviceInstance > > > *sin) > > > { > > > @@ -228,6 +259,7 @@ stream_device_connect(RedsState *reds, > > > SpiceCharDeviceInstance *sin) > > > StreamDevice *dev = stream_device_new(sin, reds); > > > dev->channel = channel; > > > stream_channel_register_start_cb(channel, > > > stream_device_stream_start, dev); > > > + stream_channel_register_queue_stat_cb(channel, > > > stream_device_stream_queue_stat, dev); > > > > > > sif = spice_char_device_get_interface(sin); > > > if (sif->state) { _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel