On Mon, 2016-02-29 at 15:46 +0530, arun at accosted.net wrote: > Â module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c > Â module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS) > Â module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la > -module_rtp_send_la_CFLAGS = $(AM_CFLAGS) > +module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS) > Â > Â module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c > Â module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS) > Â module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la > -module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) > +module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS) > Â > Â # JACK > Â > @@ -2185,7 +2193,7 @@ module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS) > Â module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c > Â module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS) > Â module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la > -module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp > +module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS) Adding GSTREAMER_CFLAGS to the module CFLAGS seems unnecessary. Only librtp contains GStreamer code. > +typedef struct pa_rtp_context { The typedef is already done in rtp.h, no need to do it again. > +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) { > +Â Â Â Â pa_rtp_context *c = NULL; > +Â Â Â Â GError *error = NULL; > + > +Â Â Â Â pa_assert(fd >= 0); > + > +Â Â Â Â c = pa_xnew0(pa_rtp_context, 1); > + > +Â Â Â Â c->fdsem = pa_fdsem_new(); As far as I can tell, the fdsem is only needed when receiving data, not when sending. > +Â Â Â Â c->ss = *ss; > + > +Â Â Â Â if (!gst_init_check(NULL, NULL, &error)) { > +Â Â Â Â Â Â Â Â pa_log_error("Could not initialise GStreamer: %s", error->message); > +Â Â Â Â Â Â Â Â g_error_free(error); > +Â Â Â Â Â Â Â Â goto fail; > +Â Â Â Â } > + > +Â Â Â Â if (!init_send_pipeline(c, fd, payload, mtu, ss)) > +Â Â Â Â Â Â Â Â goto fail; > + > +Â Â Â Â return c; > + > +fail: > +Â Â Â Â pa_xfree(c); You should call pa_rtp_context_free() to be sure that everything gets properly deinitialized. Now you're leaking the fdsem. The same comment applies to pa_rtp_context_new_recv() too. > +static bool process_bus_messages(pa_rtp_context *c) { > +Â Â Â Â GstBus *bus; > +Â Â Â Â GstMessage *message; > +Â Â Â Â bool ret = true; > + > +Â Â Â Â bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); > + > +Â Â Â Â while (ret && (message = gst_bus_pop(bus))) { > +Â Â Â Â Â Â Â Â if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) { > +Â Â Â Â Â Â Â Â Â Â Â Â GError *error = NULL; > + > +Â Â Â Â Â Â Â Â Â Â Â Â ret = false; > + > +Â Â Â Â Â Â Â Â Â Â Â Â gst_message_parse_error(message, &error, NULL); > +Â Â Â Â Â Â Â Â Â Â Â Â pa_log("Got an error: %s", error->message); > + > +Â Â Â Â Â Â Â Â Â Â Â Â g_error_free(error); > + > +Â Â Â Â Â Â Â Â Â Â Â Â pa_fdsem_post(c->fdsem); What's the purpose of this? If I understand correctly, the fdsem is used to wake up the pulseaudio source IO thread when we receive data in a gstreamer thread, but this code runs in the IO thread, so this seems pointless. (By the way, it would be good to have comments in each function about which thread they run in.) > +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { > +Â Â Â Â pa_memchunk chunk = { 0, }; > +Â Â Â Â GstBuffer *buf; > +Â Â Â Â void *data; > +Â Â Â Â bool stop = false; > +Â Â Â Â int ret = 0; > + > +Â Â Â Â pa_assert(c); > +Â Â Â Â pa_assert(q); > + > +Â Â Â Â if (!process_bus_messages(c)) > +Â Â Â Â Â Â Â Â return -1; > + > +Â Â Â Â while (!stop && pa_memblockq_peek(q, &chunk) == 0) { > +Â Â Â Â Â Â Â Â pa_assert(chunk.memblock); > + > +Â Â Â Â Â Â Â Â data = pa_memblock_acquire(chunk.memblock); > + > +Â Â Â Â Â Â Â Â buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS, > +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â data, chunk.length, chunk.index, chunk.length, chunk.memblock, > +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â (GDestroyNotify) free_buffer); > + > +Â Â Â Â Â Â Â Â if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { > +Â Â Â Â Â Â Â Â Â Â Â Â pa_log_error("Could not push buffer"); > +Â Â Â Â Â Â Â Â Â Â Â Â stop = true; > +Â Â Â Â Â Â Â Â Â Â Â Â ret = -1; > +Â Â Â Â Â Â Â Â } > + > +Â Â Â Â Â Â Â Â pa_memblockq_drop(q, chunk.length); > +Â Â Â Â } > + > +Â Â Â Â return ret; > +} I wonder about the error handling in this function. module-rtp-send.c, which calls this function, doesn't care about the return value. Unloading the module would make sense to me, but if you don't do that, how do you ensure that we don't end up spamming errors to the log infinitely if the pipeline stops working? > +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) { > +Â Â Â Â pa_rtp_context *c = (pa_rtp_context *) userdata; > +Â Â Â Â GstElement *depay; > +Â Â Â Â GstPad *sinkpad; > +Â Â Â Â GstPadLinkReturn ret; > + > +Â Â Â Â depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay"); > +Â Â Â Â pa_assert(depay); > + > +Â Â Â Â sinkpad = gst_element_get_static_pad(depay, "sink"); > + > +Â Â Â Â ret = gst_pad_link(pad, sinkpad); > +Â Â Â Â if (ret != GST_PAD_LINK_OK) { > +Â Â Â Â Â Â Â Â GstBus *bus; > +Â Â Â Â Â Â Â Â GError *error; > + > +Â Â Â Â Â Â Â Â bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); > +Â Â Â Â Â Â Â Â error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader"); > +Â Â Â Â Â Â Â Â gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL)); It's not clear to me how the messages are dispatched. How does the sink IO thread get notified of this message? Messages are processed in pa_rtp_recv(), which is I think is called when we post to the fdsem, but we don't call pa_fdsem_post() at least in this function. Does the error produce an EOS event, which then will trigger a pa_fdsem_post() call? > +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) { > +Â Â Â Â GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL; > +Â Â Â Â GstCaps *caps; > +Â Â Â Â GSocket *socket; > +Â Â Â Â GError *error = NULL; > + > +Â Â Â Â MAKE_ELEMENT(udpsrc, "udpsrc"); > +Â Â Â Â MAKE_ELEMENT(rtpbin, "rtpbin"); > +Â Â Â Â MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay"); > +Â Â Â Â MAKE_ELEMENT(appsink, "appsink"); > + > +Â Â Â Â c->pipeline = gst_pipeline_new(NULL); > + > +Â Â Â Â gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL); > + > +Â Â Â Â socket = g_socket_new_from_fd(fd, &error); > +Â Â Â Â if (error) { > +Â Â Â Â Â Â Â Â pa_log("Could not create socket: %s", error->message); > +Â Â Â Â Â Â Â Â g_error_free(error); > +Â Â Â Â Â Â Â Â goto fail; > +Â Â Â Â } > + > +Â Â Â Â caps = rtp_caps_from_sample_spec(ss); > +Â Â Â Â if (!caps) { > +Â Â Â Â Â Â Â Â pa_log("Unsupported format to payload"); > +Â Â Â Â Â Â Â Â goto fail; > +Â Â Â Â } > + > +Â Â Â Â g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL); > +Â Â Â Â g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL); > +Â Â Â Â g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL); > + > +Â Â Â Â gst_caps_unref(caps); > +Â Â Â Â g_object_unref(socket); > + > +Â Â Â Â if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") || > +Â Â Â Â Â Â Â Â !gst_element_link(depay, appsink)) { > + > +Â Â Â Â Â Â Â Â pa_log("Could not set up send pipeline"); s/send/receive/ > +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) { > +Â Â Â Â GstSample *sample = NULL; > +Â Â Â Â GstBuffer *buf; > +Â Â Â Â GstMapInfo info; > +Â Â Â Â void *data; > + > +Â Â Â Â if (!process_bus_messages(c)) > +Â Â Â Â Â Â Â Â goto fail; > + > +Â Â Â Â sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink)); > +Â Â Â Â if (!sample) { > +Â Â Â Â Â Â Â Â pa_log_warn("Could not get any more data"); > +Â Â Â Â Â Â Â Â goto fail; > +Â Â Â Â } > + > +Â Â Â Â buf = gst_sample_get_buffer(sample); > + > +Â Â Â Â if (GST_BUFFER_IS_DISCONT(buf)) > +Â Â Â Â Â Â Â Â pa_log_info("Discontinuity detected, possibly lost some packets"); > + > +Â Â Â Â if (!gst_buffer_map(buf, &info, GST_MAP_READ)) > +Â Â Â Â Â Â Â Â goto fail; It would be good to log something here. -- Tanu