Signed-off-by: Francois Gouget <fgouget@xxxxxxxxxxxxxxx> --- src/channel-display-mjpeg.c | 142 ++++++++++++++++++++++++++++--- src/channel-display-priv.h | 10 ++- src/channel-display.c | 201 +++++++++++--------------------------------- 3 files changed, 189 insertions(+), 164 deletions(-) diff --git a/src/channel-display-mjpeg.c b/src/channel-display-mjpeg.c index 927827b..f3f4ceb 100644 --- a/src/channel-display-mjpeg.c +++ b/src/channel-display-mjpeg.c @@ -31,11 +31,16 @@ typedef struct MJpegDecoder { /* ---------- The builtin mjpeg decoder ---------- */ - SpiceMsgIn *frame_msg; struct jpeg_source_mgr mjpeg_src; struct jpeg_decompress_struct mjpeg_cinfo; struct jpeg_error_mgr mjpeg_jerr; + /* ---------- Frame queue ---------- */ + + GQueue *msgq; + SpiceMsgIn *cur_frame_msg; + guint timer_id; + /* ---------- Output frame data ---------- */ uint8_t *out_frame; @@ -50,7 +55,7 @@ static void mjpeg_src_init(struct jpeg_decompress_struct *cinfo) MJpegDecoder *decoder = SPICE_CONTAINEROF(cinfo->src, MJpegDecoder, mjpeg_src); uint8_t *data; - cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->frame_msg, &data); + cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->cur_frame_msg, &data); cinfo->src->next_input_byte = data; } @@ -72,10 +77,12 @@ static void mjpeg_src_term(struct jpeg_decompress_struct *cinfo) } -/* ---------- VideoDecoder's public API ---------- */ +/* ---------- Decoder proper ---------- */ -static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder, - SpiceMsgIn *frame_msg) +static void mjpeg_decoder_schedule(MJpegDecoder *decoder); + +/* main context */ +static gboolean mjpeg_decoder_decode_frame(gpointer video_decoder) { MJpegDecoder *decoder = (MJpegDecoder*)video_decoder; gboolean back_compat = decoder->base.stream->channel->priv->peer_hdr.major_version == 1; @@ -84,8 +91,7 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder, uint8_t *dest; uint8_t *lines[4]; - decoder->frame_msg = frame_msg; - stream_get_dimensions(decoder->base.stream, frame_msg, &width, &height); + stream_get_dimensions(decoder->base.stream, decoder->cur_frame_msg, &width, &height); if (decoder->out_size < width * height * 4) { g_free(decoder->out_frame); decoder->out_size = width * height * 4; @@ -118,7 +124,7 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder, */ if (decoder->mjpeg_cinfo.rec_outbuf_height > G_N_ELEMENTS(lines)) { jpeg_abort_decompress(&decoder->mjpeg_cinfo); - g_return_val_if_reached(NULL); + g_return_val_if_reached(G_SOURCE_REMOVE); } while (decoder->mjpeg_cinfo.output_scanline < decoder->mjpeg_cinfo.output_height) { @@ -161,12 +167,125 @@ static uint8_t* mjpeg_decoder_decode_frame(VideoDecoder *video_decoder, } jpeg_finish_decompress(&decoder->mjpeg_cinfo); - return decoder->out_frame; + /* Display the frame and dispose of it */ + stream_display_frame(decoder->base.stream, decoder->cur_frame_msg, decoder->out_frame); + spice_msg_in_unref(decoder->cur_frame_msg); + decoder->cur_frame_msg = NULL; + decoder->timer_id = 0; + + /* Schedule the next frame */ + mjpeg_decoder_schedule(decoder); + + return G_SOURCE_REMOVE; +} + +/* ---------- VideoDecoder's queue scheduling ---------- */ + +static void mjpeg_decoder_schedule(MJpegDecoder *decoder) +{ + SPICE_DEBUG("%s", __FUNCTION__); + if (decoder->timer_id) { + return; + } + + guint32 time = stream_get_time(decoder->base.stream); + SpiceMsgIn *frame_msg = decoder->cur_frame_msg; + decoder->cur_frame_msg = NULL; + do { + if (frame_msg) { + SpiceStreamDataHeader *op = spice_msg_in_parsed(frame_msg); + if (time <= op->multi_media_time) { + guint32 d = op->multi_media_time - time; + decoder->cur_frame_msg = frame_msg; + decoder->timer_id = g_timeout_add(d, mjpeg_decoder_decode_frame, decoder); + break; + } + + SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping ", + __FUNCTION__, time - op->multi_media_time, + op->multi_media_time, time); + stream_dropped_frame(decoder->base.stream); + spice_msg_in_unref(frame_msg); + } + frame_msg = g_queue_pop_head(decoder->msgq); + } while (frame_msg); +} + + +/* mjpeg_decoder_drop_queue() helper */ +static void _msg_in_unref_func(gpointer data, gpointer user_data) +{ + spice_msg_in_unref(data); +} + +static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder) +{ + if (decoder->timer_id != 0) { + g_source_remove(decoder->timer_id); + decoder->timer_id = 0; + } + if (decoder->cur_frame_msg) { + spice_msg_in_unref(decoder->cur_frame_msg); + decoder->cur_frame_msg = NULL; + } + g_queue_foreach(decoder->msgq, _msg_in_unref_func, NULL); + g_queue_clear(decoder->msgq); +} + +/* ---------- VideoDecoder's public API ---------- */ + +static void mjpeg_decoder_queue_frame(VideoDecoder *video_decoder, + SpiceMsgIn *frame_msg, int32_t latency) +{ + MJpegDecoder *decoder = (MJpegDecoder*)video_decoder; + SpiceMsgIn *last_msg; + + SPICE_DEBUG("%s", __FUNCTION__); + + last_msg = g_queue_peek_tail(decoder->msgq); + if (last_msg) { + SpiceStreamDataHeader *last_op, *frame_op; + last_op = spice_msg_in_parsed(last_msg); + frame_op = spice_msg_in_parsed(frame_msg); + if (frame_op->multi_media_time < last_op->multi_media_time) { + /* This should really not happen */ + SPICE_DEBUG("new-frame-time < last-frame-time (%u < %u):" + " resetting stream, id %d", + frame_op->multi_media_time, + last_op->multi_media_time, frame_op->id); + mjpeg_decoder_drop_queue(decoder); + } + } + + /* Dropped MJPEG frames don't impact the ones that come after. + * So drop late frames as early as possible to save on processing time. + */ + if (latency < 0) { + return; + } + + spice_msg_in_ref(frame_msg); + g_queue_push_tail(decoder->msgq, frame_msg); + mjpeg_decoder_schedule(decoder); +} + +static void mjpeg_decoder_reschedule(VideoDecoder *video_decoder) +{ + MJpegDecoder *decoder = (MJpegDecoder*)video_decoder; + + SPICE_DEBUG("%s", __FUNCTION__); + if (decoder->timer_id != 0) { + g_source_remove(decoder->timer_id); + decoder->timer_id = 0; + } + mjpeg_decoder_schedule(decoder); } static void mjpeg_decoder_destroy(VideoDecoder* video_decoder) { MJpegDecoder *decoder = (MJpegDecoder*)video_decoder; + + mjpeg_decoder_drop_queue(decoder); jpeg_destroy_decompress(&decoder->mjpeg_cinfo); g_free(decoder->out_frame); free(decoder); @@ -180,10 +299,13 @@ VideoDecoder* create_mjpeg_decoder(int codec_type, display_stream *stream) MJpegDecoder *decoder = spice_new0(MJpegDecoder, 1); decoder->base.destroy = mjpeg_decoder_destroy; - decoder->base.decode_frame = mjpeg_decoder_decode_frame; + decoder->base.reschedule = mjpeg_decoder_reschedule; + decoder->base.queue_frame = mjpeg_decoder_queue_frame; decoder->base.codec_type = codec_type; decoder->base.stream = stream; + decoder->msgq = g_queue_new(); + decoder->mjpeg_cinfo.err = jpeg_std_error(&decoder->mjpeg_jerr); jpeg_create_decompress(&decoder->mjpeg_cinfo); diff --git a/src/channel-display-priv.h b/src/channel-display-priv.h index 5256ad9..92cba50 100644 --- a/src/channel-display-priv.h +++ b/src/channel-display-priv.h @@ -41,6 +41,9 @@ struct VideoDecoder { /* Releases the video decoder's resources */ void (*destroy)(VideoDecoder *decoder); + /* Notifies the decoder that the mm-time clock changed. */ + void (*reschedule)(VideoDecoder *video_decoder); + /* Decompresses the specified frame. * * @decoder: The video decoder. @@ -49,7 +52,7 @@ struct VideoDecoder { * buffer will be invalidated by the next call to * decode_frame(). */ - uint8_t* (*decode_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg); + void (*queue_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg, int32_t latency); /* The format of the encoded video. */ int codec_type; @@ -98,8 +101,6 @@ struct display_stream { VideoDecoder *video_decoder; - GQueue *msgq; - guint timeout; SpiceChannel *channel; /* stats */ @@ -127,6 +128,9 @@ struct display_stream { }; void stream_get_dimensions(display_stream *st, SpiceMsgIn *frame_msg, int *width, int *height); +guint32 stream_get_time(display_stream *st); +void stream_dropped_frame(display_stream *st); +void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg, uint8_t* data); uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data); diff --git a/src/channel-display.c b/src/channel-display.c index adba315..e591add 100644 --- a/src/channel-display.c +++ b/src/channel-display.c @@ -106,11 +106,9 @@ static void channel_set_handlers(SpiceChannelClass *klass); static void clear_surfaces(SpiceChannel *channel, gboolean keep_primary); static void clear_streams(SpiceChannel *channel); static display_surface *find_surface(SpiceDisplayChannelPrivate *c, guint32 surface_id); -static gboolean display_stream_render(display_stream *st); static void spice_display_channel_reset(SpiceChannel *channel, gboolean migrating); static void spice_display_channel_reset_capabilities(SpiceChannel *channel); static void destroy_canvas(display_surface *surface); -static void _msg_in_unref_func(gpointer data, gpointer user_data); static void display_session_mm_time_reset_cb(SpiceSession *session, gpointer data); static SpiceGlScanout* spice_gl_scanout_copy(const SpiceGlScanout *scanout); @@ -1090,7 +1088,6 @@ static void display_handle_stream_create(SpiceChannel *channel, SpiceMsgIn *in) spice_msg_in_ref(in); st->clip = &op->clip; st->surface = find_surface(c, op->surface_id); - st->msgq = g_queue_new(); st->channel = channel; st->drops_seqs_stats_arr = g_array_new(FALSE, FALSE, sizeof(drops_sequence_stats)); @@ -1109,45 +1106,6 @@ static void display_handle_stream_create(SpiceChannel *channel, SpiceMsgIn *in) } } -/* coroutine or main context */ -static gboolean display_stream_schedule(display_stream *st) -{ - SpiceSession *session = spice_channel_get_session(st->channel); - guint32 time, d; - SpiceStreamDataHeader *op; - SpiceMsgIn *in; - - SPICE_DEBUG("%s", __FUNCTION__); - if (st->timeout || !session) - return TRUE; - - time = spice_session_get_mm_time(session); - in = g_queue_peek_head(st->msgq); - - if (in == NULL) { - return TRUE; - } - - op = spice_msg_in_parsed(in); - if (time < op->multi_media_time) { - d = op->multi_media_time - time; - SPICE_DEBUG("scheduling next stream render in %u ms", d); - st->timeout = g_timeout_add(d, (GSourceFunc)display_stream_render, st); - return TRUE; - } else { - SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping ", - __FUNCTION__, time - op->multi_media_time, - op->multi_media_time, time); - in = g_queue_pop_head(st->msgq); - spice_msg_in_unref(in); - st->num_drops_on_playback++; - if (g_queue_get_length(st->msgq) == 0) - return TRUE; - } - - return FALSE; -} - static SpiceRect *stream_get_dest(display_stream *st, SpiceMsgIn *frame_msg) { if (frame_msg == NULL || @@ -1210,66 +1168,54 @@ void stream_get_dimensions(display_stream *st, SpiceMsgIn *frame_msg, int *width } } -/* main context */ -static gboolean display_stream_render(display_stream *st) +G_GNUC_INTERNAL +guint32 stream_get_time(display_stream *st) { - SpiceMsgIn *in; + SpiceSession *session = spice_channel_get_session(st->channel); + return session ? spice_session_get_mm_time(session) : 0; +} - st->timeout = 0; - do { - in = g_queue_pop_head(st->msgq); +/* coroutine or main context */ +G_GNUC_INTERNAL +void stream_dropped_frame(display_stream *st) +{ + st->num_drops_on_playback++; +} - g_return_val_if_fail(in != NULL, FALSE); +/* main context */ +G_GNUC_INTERNAL +void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg, + uint8_t* data) +{ + int width, height; + SpiceRect *dest; + int stride; - uint8_t *out_frame = NULL; - if (st->video_decoder) { - out_frame = st->video_decoder->decode_frame(st->video_decoder, in); - } - if (out_frame) { - int width; - int height; - SpiceRect *dest; - uint8_t *data; - int stride; - - stream_get_dimensions(st, in, &width, &height); - dest = stream_get_dest(st, in); - - data = out_frame; - stride = width * sizeof(uint32_t); - if (!(stream_get_flags(st) & SPICE_STREAM_FLAGS_TOP_DOWN)) { - data += stride * (height - 1); - stride = -stride; - } + stream_get_dimensions(st, frame_msg, &width, &height); + dest = stream_get_dest(st, frame_msg); - st->surface->canvas->ops->put_image( - st->surface->canvas, + stride = width * sizeof(uint32_t); + if (!(stream_get_flags(st) & SPICE_STREAM_FLAGS_TOP_DOWN)) { + data += stride * (height - 1); + stride = -stride; + } + + st->surface->canvas->ops->put_image( + st->surface->canvas, #ifdef G_OS_WIN32 - SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc, + SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc, #endif - dest, data, - width, height, stride, - st->have_region ? &st->region : NULL); - - if (st->surface->primary) - g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE], 0, - dest->left, dest->top, - dest->right - dest->left, - dest->bottom - dest->top); - } + dest, data, + width, height, stride, + st->have_region ? &st->region : NULL); - spice_msg_in_unref(in); - - in = g_queue_peek_head(st->msgq); - if (in == NULL) - break; - - if (display_stream_schedule(st)) - return FALSE; - } while (1); - - return FALSE; + if (st->surface->primary) + g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE], 0, + dest->left, dest->top, + dest->right - dest->left, + dest->bottom - dest->top); } + /* after a sequence of 3 drops, push a report to the server, even * if the report window is bigger */ #define STREAM_REPORT_DROP_SEQ_LEN_LIMIT 3 @@ -1330,17 +1276,6 @@ static void display_update_stream_report(SpiceDisplayChannel *channel, uint32_t } } -static void display_stream_reset_rendering_timer(display_stream *st) -{ - SPICE_DEBUG("%s", __FUNCTION__); - if (st->timeout != 0) { - g_source_remove(st->timeout); - st->timeout = 0; - } - while (!display_stream_schedule(st)) { - } -} - /* * Migration can occur between 2 spice-servers with different mm-times. * Then, the following cases can happen after migration completes: @@ -1370,8 +1305,9 @@ static void display_stream_reset_rendering_timer(display_stream *st) * case 2 is less likely, since at takes at least 20 frames till the dst-server re-identifies * the video stream and starts sending stream data * - * display_session_mm_time_reset_cb handles case 1.a, and - * display_stream_test_frames_mm_time_reset handles case 2.b + * display_session_mm_time_reset_cb handles case 1.a by notifying the + * video decoders through their reschedule() method, and case 2.b is handled + * directly by the video decoders in their queue_frame() method */ /* main context */ @@ -1391,36 +1327,7 @@ static void display_session_mm_time_reset_cb(SpiceSession *session, gpointer dat } SPICE_DEBUG("%s: stream-id %d", __FUNCTION__, i); st = c->streams[i]; - display_stream_reset_rendering_timer(st); - } -} - -/* coroutine context */ -static void display_stream_test_frames_mm_time_reset(display_stream *st, - SpiceMsgIn *new_frame_msg, - guint32 mm_time) -{ - SpiceStreamDataHeader *tail_op, *new_op; - SpiceMsgIn *tail_msg; - - SPICE_DEBUG("%s", __FUNCTION__); - g_return_if_fail(new_frame_msg != NULL); - tail_msg = g_queue_peek_tail(st->msgq); - if (!tail_msg) { - return; - } - tail_op = spice_msg_in_parsed(tail_msg); - new_op = spice_msg_in_parsed(new_frame_msg); - - if (new_op->multi_media_time < tail_op->multi_media_time) { - SPICE_DEBUG("new-frame-time < tail-frame-time (%u < %u):" - " reseting stream, id %d", - new_op->multi_media_time, - tail_op->multi_media_time, - new_op->id); - g_queue_foreach(st->msgq, _msg_in_unref_func, NULL); - g_queue_clear(st->msgq); - display_stream_reset_rendering_timer(st); + st->video_decoder->reschedule(st->video_decoder); } } @@ -1440,7 +1347,7 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in) g_return_if_fail(c->nstreams > op->id); st = c->streams[op->id]; - mmtime = spice_session_get_mm_time(spice_channel_get_session(channel)); + mmtime = stream_get_time(st); if (spice_msg_in_type(in) == SPICE_MSG_DISPLAY_STREAM_DATA_SIZED) { CHANNEL_DEBUG(channel, "stream %d contains sized data", op->id); @@ -1470,11 +1377,6 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in) st->playback_sync_drops_seq_len++; } else { CHANNEL_DEBUG(channel, "video latency: %d", latency); - spice_msg_in_ref(in); - display_stream_test_frames_mm_time_reset(st, in, mmtime); - g_queue_push_tail(st->msgq, in); - while (!display_stream_schedule(st)) { - } if (st->cur_drops_seq_stats.len) { st->cur_drops_seq_stats.duration = op->multi_media_time - st->cur_drops_seq_stats.start_mm_time; @@ -1484,6 +1386,12 @@ static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in) } st->playback_sync_drops_seq_len = 0; } + + /* Let the video decoder queue the frames so it can optimize their + * decoding and best decide if/when to drop them when they are late, + * taking into account the impact on later frames. + */ + st->video_decoder->queue_frame(st->video_decoder, in, latency); if (c->enable_adaptive_streaming) { display_update_stream_report(SPICE_DISPLAY_CHANNEL(channel), op->id, op->multi_media_time, latency); @@ -1516,11 +1424,6 @@ static void display_handle_stream_clip(SpiceChannel *channel, SpiceMsgIn *in) display_update_stream_region(st); } -static void _msg_in_unref_func(gpointer data, gpointer user_data) -{ - spice_msg_in_unref(data); -} - static void destroy_stream(SpiceChannel *channel, int id) { SpiceDisplayChannelPrivate *c = SPICE_DISPLAY_CHANNEL(channel)->priv; @@ -1574,10 +1477,6 @@ static void destroy_stream(SpiceChannel *channel, int id) spice_msg_in_unref(st->msg_clip); spice_msg_in_unref(st->msg_create); - g_queue_foreach(st->msgq, _msg_in_unref_func, NULL); - g_queue_free(st->msgq); - if (st->timeout != 0) - g_source_remove(st->timeout); g_free(st); c->streams[id] = NULL; } -- 2.8.0.rc3 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel