On 21 April 2016 at 21:33, Tanu Kaskinen <tanuk at iki.fi> wrote: > On Mon, 2016-02-29 at 15:46 +0530, arun at accosted.net wrote: >> module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c >> module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS) >> module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la >> -module_rtp_send_la_CFLAGS = $(AM_CFLAGS) >> +module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS) >> >> module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c >> module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS) >> module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la >> -module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) >> +module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS) >> >> # JACK >> >> @@ -2185,7 +2193,7 @@ module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS) >> module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c >> module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS) >> module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la >> -module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp >> +module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS) > > Adding GSTREAMER_CFLAGS to the module CFLAGS seems unnecessary. Only > librtp contains GStreamer code. > >> +typedef struct pa_rtp_context { > > The typedef is already done in rtp.h, no need to do it again. > >> +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) { >> + pa_rtp_context *c = NULL; >> + GError *error = NULL; >> + >> + pa_assert(fd >= 0); >> + >> + c = pa_xnew0(pa_rtp_context, 1); >> + >> + c->fdsem = pa_fdsem_new(); > > As far as I can tell, the fdsem is only needed when receiving data, not > when sending. Right, fixed. >> + c->ss = *ss; >> + >> + if (!gst_init_check(NULL, NULL, &error)) { >> + pa_log_error("Could not initialise GStreamer: %s", error->message); >> + g_error_free(error); >> + goto fail; >> + } >> + >> + if (!init_send_pipeline(c, fd, payload, mtu, ss)) >> + goto fail; >> + >> + return c; >> + >> +fail: >> + pa_xfree(c); > > You should call pa_rtp_context_free() to be sure that everything gets > properly deinitialized. Now you're leaking the fdsem. The same comment > applies to pa_rtp_context_new_recv() too. Done. >> +static bool process_bus_messages(pa_rtp_context *c) { >> + GstBus *bus; >> + GstMessage *message; >> + bool ret = true; >> + >> + bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); >> + >> + while (ret && (message = gst_bus_pop(bus))) { >> + if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) { >> + GError *error = NULL; >> + >> + ret = false; >> + >> + gst_message_parse_error(message, &error, NULL); >> + pa_log("Got an error: %s", error->message); >> + >> + g_error_free(error); >> + >> + pa_fdsem_post(c->fdsem); > > What's the purpose of this? If I understand correctly, the fdsem is > used to wake up the pulseaudio source IO thread when we receive data in > a gstreamer thread, but this code runs in the IO thread, so this seems > pointless. > > (By the way, it would be good to have comments in each function about > which thread they run in.) Added comments about this. And you're right, this one is not needed. > >> +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { >> + pa_memchunk chunk = { 0, }; >> + GstBuffer *buf; >> + void *data; >> + bool stop = false; >> + int ret = 0; >> + >> + pa_assert(c); >> + pa_assert(q); >> + >> + if (!process_bus_messages(c)) >> + return -1; >> + >> + while (!stop && pa_memblockq_peek(q, &chunk) == 0) { >> + pa_assert(chunk.memblock); >> + >> + data = pa_memblock_acquire(chunk.memblock); >> + >> + buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS, >> + data, chunk.length, chunk.index, chunk.length, chunk.memblock, >> + (GDestroyNotify) free_buffer); >> + >> + if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { >> + pa_log_error("Could not push buffer"); >> + stop = true; >> + ret = -1; >> + } >> + >> + pa_memblockq_drop(q, chunk.length); >> + } >> + >> + return ret; >> +} > > I wonder about the error handling in this function. module-rtp-send.c, > which calls this function, doesn't care about the return value. > Unloading the module would make sense to me, but if you don't do that, > how do you ensure that we don't end up spamming errors to the log > infinitely if the pipeline stops working? I can add another patch on top of this to unload the module of send fails. >> +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) { >> + pa_rtp_context *c = (pa_rtp_context *) userdata; >> + GstElement *depay; >> + GstPad *sinkpad; >> + GstPadLinkReturn ret; >> + >> + depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay"); >> + pa_assert(depay); >> + >> + sinkpad = gst_element_get_static_pad(depay, "sink"); >> + >> + ret = gst_pad_link(pad, sinkpad); >> + if (ret != GST_PAD_LINK_OK) { >> + GstBus *bus; >> + GError *error; >> + >> + bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); >> + error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader"); >> + gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL)); > > It's not clear to me how the messages are dispatched. How does the sink > IO thread get notified of this message? Messages are processed in > pa_rtp_recv(), which is I think is called when we post to the fdsem, > but we don't call pa_fdsem_post() at least in this function. Does the > error produce an EOS event, which then will trigger a pa_fdsem_post() > call? No, we need to call pa_fdsem_post(). Will fix that. >> +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) { >> + GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL; >> + GstCaps *caps; >> + GSocket *socket; >> + GError *error = NULL; >> + >> + MAKE_ELEMENT(udpsrc, "udpsrc"); >> + MAKE_ELEMENT(rtpbin, "rtpbin"); >> + MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay"); >> + MAKE_ELEMENT(appsink, "appsink"); >> + >> + c->pipeline = gst_pipeline_new(NULL); >> + >> + gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL); >> + >> + socket = g_socket_new_from_fd(fd, &error); >> + if (error) { >> + pa_log("Could not create socket: %s", error->message); >> + g_error_free(error); >> + goto fail; >> + } >> + >> + caps = rtp_caps_from_sample_spec(ss); >> + if (!caps) { >> + pa_log("Unsupported format to payload"); >> + goto fail; >> + } >> + >> + g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL); >> + g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL); >> + g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL); >> + >> + gst_caps_unref(caps); >> + g_object_unref(socket); >> + >> + if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") || >> + !gst_element_link(depay, appsink)) { >> + >> + pa_log("Could not set up send pipeline"); > > s/send/receive/ > >> +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) { >> + GstSample *sample = NULL; >> + GstBuffer *buf; >> + GstMapInfo info; >> + void *data; >> + >> + if (!process_bus_messages(c)) >> + goto fail; >> + >> + sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink)); >> + if (!sample) { >> + pa_log_warn("Could not get any more data"); >> + goto fail; >> + } >> + >> + buf = gst_sample_get_buffer(sample); >> + >> + if (GST_BUFFER_IS_DISCONT(buf)) >> + pa_log_info("Discontinuity detected, possibly lost some packets"); >> + >> + if (!gst_buffer_map(buf, &info, GST_MAP_READ)) >> + goto fail; > > It would be good to log something here. Sure. This is just a sanity check though, and should never happen. -- Arun