[PATCH v5 18/20] spice-gtk: Avoid copying the compressed message data in the GStreamer decoder.

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Francois Gouget <fgouget@xxxxxxxxxxxxxxx>
---

Changes since take 2:
 - None.

 src/channel-display-gst.c | 89 ++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 84 insertions(+), 5 deletions(-)

diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
index 3024356..59ce3e1 100644
--- a/src/channel-display-gst.c
+++ b/src/channel-display-gst.c
@@ -48,11 +48,43 @@ struct GstDecoder {
 
     /* ---------- Output frame data ---------- */
 
+    GMutex pipeline_mutex;
+    GCond pipeline_cond;
+    int pipeline_wait;
+    uint32_t samples_count;
+
     GstSample *sample;
     GstMapInfo mapinfo;
 };
 
 
+/* Signals that the pipeline is done processing the last buffer we gave it.
+ *
+ * @decoder:   The video decoder object.
+ * @samples:   How many samples to add to the available samples count.
+ */
+static void signal_pipeline(GstDecoder *decoder, uint32_t samples)
+{
+    g_mutex_lock(&decoder->pipeline_mutex);
+    decoder->pipeline_wait = 0;
+    decoder->samples_count += samples;
+    g_cond_signal(&decoder->pipeline_cond);
+    g_mutex_unlock(&decoder->pipeline_mutex);
+}
+
+static void appsrc_need_data_cb(GstAppSrc *src, guint length, gpointer user_data)
+{
+    GstDecoder *decoder = (GstDecoder*)user_data;
+    signal_pipeline(decoder, 0);
+}
+
+static GstFlowReturn appsink_new_sample_cb(GstAppSink *appsrc, gpointer user_data)
+{
+    GstDecoder *decoder = (GstDecoder*)user_data;
+    signal_pipeline(decoder, 1);
+    return GST_FLOW_OK;
+}
+
 /* ---------- GStreamer pipeline ---------- */
 
 static void reset_pipeline(GstDecoder *decoder)
@@ -66,10 +98,18 @@ static void reset_pipeline(GstDecoder *decoder)
     gst_object_unref(decoder->appsink);
     gst_object_unref(decoder->pipeline);
     decoder->pipeline = NULL;
+
+    g_mutex_clear(&decoder->pipeline_mutex);
+    g_cond_clear(&decoder->pipeline_cond);
 }
 
 static gboolean construct_pipeline(GstDecoder *decoder)
 {
+    g_mutex_init(&decoder->pipeline_mutex);
+    g_cond_init(&decoder->pipeline_cond);
+    decoder->pipeline_wait = 1;
+    decoder->samples_count = 0;
+
     const gchar *src_caps, *gstdec_name;
     switch (decoder->base.codec_type) {
     case SPICE_VIDEO_CODEC_TYPE_MJPEG:
@@ -101,7 +141,12 @@ static gboolean construct_pipeline(GstDecoder *decoder)
     }
 
     decoder->appsrc = GST_APP_SRC(gst_bin_get_by_name(GST_BIN(decoder->pipeline), "src"));
+    GstAppSrcCallbacks appsrc_cbs = {&appsrc_need_data_cb, NULL, NULL};
+    gst_app_src_set_callbacks(decoder->appsrc, &appsrc_cbs, decoder, NULL);
+
     decoder->appsink = GST_APP_SINK(gst_bin_get_by_name(GST_BIN(decoder->pipeline), "sink"));
+    GstAppSinkCallbacks appsink_cbs = {NULL, NULL, &appsink_new_sample_cb};
+    gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs, decoder, NULL);
 
     if (gst_element_set_state(decoder->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
         SPICE_DEBUG("GStreamer error: Unable to set the pipeline to the playing state.");
@@ -112,6 +157,11 @@ static gboolean construct_pipeline(GstDecoder *decoder)
     return TRUE;
 }
 
+static void release_msg_in(gpointer data)
+{
+    spice_msg_in_unref((SpiceMsgIn*)data);
+}
+
 static gboolean push_compressed_buffer(GstDecoder *decoder,
                                        SpiceMsgIn *frame_msg)
 {
@@ -122,10 +172,11 @@ static gboolean push_compressed_buffer(GstDecoder *decoder,
         return FALSE;
     }
 
-    // TODO.  Grr.  Seems like a wasted alloc
-    gpointer d = g_malloc(size);
-    memcpy(d, data, size);
-    GstBuffer *buffer = gst_buffer_new_wrapped(d, size);
+    /* Reference frame_msg so it stays around until our 'deallocator' releases it */
+    spice_msg_in_ref(frame_msg);
+    GstBuffer *buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY,
+                                                    data, size, 0, size,
+                                                    frame_msg, &release_msg_in);
 
     if (gst_app_src_push_buffer(decoder->appsrc, buffer) != GST_FLOW_OK) {
         SPICE_DEBUG("GStreamer error: unable to push frame of size %d", size);
@@ -195,8 +246,36 @@ static uint8_t* gst_decoder_decode_frame(GstDecoder *decoder,
      */
     release_last_frame(decoder);
 
+    /* The pipeline may have called appsrc_need_data_cb() after we got the last
+     * output frame. This would cause us to return prematurely so reset
+     * pipeline_wait so we do wait for it to process this buffer.
+     */
+    g_mutex_lock(&decoder->pipeline_mutex);
+    decoder->pipeline_wait = 1;
+    g_mutex_unlock(&decoder->pipeline_mutex);
+    /* Note that it's possible for appsrc_need_data_cb() to get called between
+     * now and the pipeline wait. But this will at most cause a one frame delay.
+     */
+
     if (push_compressed_buffer(decoder, frame_msg)) {
-        return pull_raw_frame(decoder);
+        /* Wait for the pipeline to either produce a decoded frame, or ask
+         * for more data which means an error happened.
+         */
+        g_mutex_lock(&decoder->pipeline_mutex);
+        while (decoder->pipeline_wait) {
+            g_cond_wait(&decoder->pipeline_cond, &decoder->pipeline_mutex);
+        }
+        decoder->pipeline_wait = 1;
+        uint32_t samples = decoder->samples_count;
+        if (samples) {
+            decoder->samples_count--;
+        }
+        g_mutex_unlock(&decoder->pipeline_mutex);
+
+        /* If a decoded frame waits for us, return it */
+        if (samples) {
+            return pull_raw_frame(decoder);
+        }
     }
     return NULL;
 }
-- 
2.5.0

_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
http://lists.freedesktop.org/mailman/listinfo/spice-devel




[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]     [Monitors]