On Fri, 2012-02-10 at 17:39 +0100, Fr?d?ric Dalleau wrote: > Module-bluetooth-policy can load and unload module-loopback on demand. > Sometimes if there is an error, module-loopback can be unloaded early. > When module-loopback is loaded, it attaches a sink and the sink > calls sink_input_update_max_request for which there is a callback. > In this callback, module-loopback will queue a message in order to > process it from main thread. The message is queued in the sink > thread queue. > > There is a possibility that this message is not processed if unload > has been requested. When this happens, the message > is still in the queue, and will eventually be processed. This triggers > segfault because the message carries a pointer to the no > longer existing sink input. > > The fix is inspired by pa_thread_mq does for registering a thread specific > message queue. The idea is to create a module-specific queue. Any pending > message at unload will be flushed without processing. I would like the main thread (and all other threads too) to make sure that it doesn't process messages that have an invalid receiver. The main thread should have a registry of valid receivers that each incoming message is checked against. Sink inputs would register themselves there automatically when they are created and unregister when they are unlinked. What do you think? What do Colin, Arun and David think? -- Tanu > --- > src/modules/module-loopback.c | 51 ++++++++++++++++++++++++++++++++++++++++- > 1 files changed, 50 insertions(+), 1 deletions(-) > > diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c > index 5c87c6c..b2df1ce 100644 > --- a/src/modules/module-loopback.c > +++ b/src/modules/module-loopback.c > @@ -77,6 +77,9 @@ struct userdata { > > pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write; > > + pa_asyncmsgq *requestmsgq; > + pa_io_event *io_event; > + > pa_time_event *time_event; > pa_usec_t adjust_time; > > @@ -137,6 +140,13 @@ static void teardown(struct userdata *u) { > pa_assert_ctl_context(); > > pa_asyncmsgq_flush(u->asyncmsgq, 0); > + pa_asyncmsgq_flush(u->requestmsgq, 0); > + > + if (u->io_event) { > + u->core->mainloop->io_free(u->io_event); > + u->io_event = NULL; > + } > + > if (u->sink_input) > pa_sink_input_unlink(u->sink_input); > > @@ -215,6 +225,38 @@ static void adjust_rates(struct userdata *u) { > pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time); > } > > +static void request_queue_event(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { > + struct userdata *u = userdata; > + pa_asyncmsgq *aq; > + > + pa_assert(pa_asyncmsgq_read_fd(u->requestmsgq) == fd); > + pa_assert(events == PA_IO_EVENT_INPUT); > + > + pa_asyncmsgq_ref(aq = u->requestmsgq); > + pa_asyncmsgq_read_after_poll(aq); > + > + for (;;) { > + pa_msgobject *object; > + int code; > + void *data; > + int64_t offset; > + pa_memchunk chunk; > + > + /* Check whether there is a message for us to process */ > + while (pa_asyncmsgq_get(aq, &object, &code, &data, &offset, &chunk, 0) >= 0) { > + int ret; > + > + ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); > + pa_asyncmsgq_done(aq, ret); > + } > + > + if (pa_asyncmsgq_read_before_poll(aq) == 0) > + break; > + } > + > + pa_asyncmsgq_unref(aq); > +} > + > /* Called from main context */ > static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) { > struct userdata *u = userdata; > @@ -582,7 +624,7 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) { > > pa_memblockq_set_prebuf(u->memblockq, nbytes*2); > pa_log_info("Max request changed"); > - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED, NULL, 0, NULL, NULL); > + pa_asyncmsgq_post(u->requestmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED, NULL, 0, NULL, NULL); > } > > /* Called from main thread */ > @@ -826,6 +868,10 @@ int pa__init(pa_module *m) { > > u->asyncmsgq = pa_asyncmsgq_new(0); > > + u->requestmsgq = pa_asyncmsgq_new(0); > + pa_assert_se(pa_asyncmsgq_read_before_poll(u->requestmsgq) == 0); > + u->io_event = u->core->mainloop->io_new(u->core->mainloop, pa_asyncmsgq_read_fd(u->requestmsgq), PA_IO_EVENT_INPUT, request_queue_event, u); > + > pa_sink_input_put(u->sink_input); > pa_source_output_put(u->source_output); > > @@ -857,6 +903,9 @@ void pa__done(pa_module*m) { > if (u->memblockq) > pa_memblockq_free(u->memblockq); > > + if (u->requestmsgq) > + pa_asyncmsgq_unref(u->requestmsgq); > + > if (u->asyncmsgq) > pa_asyncmsgq_unref(u->asyncmsgq); >