On 27.06.2015 17:36, Tanu Kaskinen wrote: > This fixes a crash. sink_input_pop_cb() drains the message queue that receives > memchunks from the combine sink thread to avoid requesting more audio too soon. > The same message queue received also SET_REQUESTED_LATENCY messages, which > generate rewind requests. Rewind requests shouldn't be issued in the pop() > callback, doing so results in an assertion error. Therefore, it was not safe to > drain the message queue in the pop() callback, but usually the queue is empty, > so this bug was not immediately detected. > > This patch splits the message queue into two queues: audio_inq and control_inq. > audio_inq receives only messages containing memchunks, and control_inq receives > only the SET_REQUESTED_LATENCY messages. The pop() callback only drains the > audio queue, which avoids the rewind requests in the pop() callback. > > BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=90489 > --- > src/modules/module-combine-sink.c | 108 ++++++++++++++++++++++++++++---------- > 1 file changed, 80 insertions(+), 28 deletions(-) > > diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c > index 602fa87..b6322c6 100644 > --- a/src/modules/module-combine-sink.c > +++ b/src/modules/module-combine-sink.c > @@ -90,9 +90,27 @@ struct output { > pa_sink_input *sink_input; > bool ignore_state_change; > > - pa_asyncmsgq *inq, /* Message queue from the sink thread to this sink input */ > - *outq; /* Message queue from this sink input to the sink thread */ > - pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write; > + /* This message queue is only for POST messages, i.e. the messages that > + * carry audio data from the sink thread to the output thread. The POST > + * messages need to be handled in a separate queue, because the queue is > + * processed not only in the output thread mainloop, but also inside the > + * sink input pop() callback. Processing other messages (such as > + * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least > + * one reason why it's not safe is that messages that generate rewind > + * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed > + * in the pop() callback. */ > + pa_asyncmsgq *audio_inq; > + > + /* This message queue is for all other messages than POST from the sink > + * thread to the output thread (currently "all other messages" means just > + * the SET_REQUESTED_LATENCY message). */ > + pa_asyncmsgq *control_inq; > + > + /* Message queue from the output thread to the sink thread. */ > + pa_asyncmsgq *outq; > + > + pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write; > + pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write; > pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write; > > pa_memblockq *memblockq; > @@ -352,7 +370,7 @@ finish: > pa_log_debug("Thread shutting down"); > } > > -/* Called from I/O thread context */ > +/* Called from combine sink I/O thread context */ > static void render_memblock(struct userdata *u, struct output *o, size_t length) { > pa_assert(u); > pa_assert(o); > @@ -367,7 +385,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) > > /* Maybe there's some data in the requesting output's queue > * now? */ > - while (pa_asyncmsgq_process_one(o->inq) > 0) > + while (pa_asyncmsgq_process_one(o->audio_inq) > 0) > ; > > /* Ok, now let's prepare some data if we really have to */ > @@ -385,7 +403,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) > if (j == o) > continue; > > - pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); > + pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); > } > > /* And place it directly into the requesting output's queue */ > @@ -403,7 +421,7 @@ static void request_memblock(struct output *o, size_t length) { > /* If another thread already prepared some data we received > * the data over the asyncmsgq, hence let's first process > * it. */ > - while (pa_asyncmsgq_process_one(o->inq) > 0) > + while (pa_asyncmsgq_process_one(o->audio_inq) > 0) > ; > > /* Check whether we're now readable */ > @@ -514,12 +532,19 @@ static void sink_input_attach_cb(pa_sink_input *i) { > pa_assert_se(o = i->userdata); > > /* Set up the queue from the sink thread to us */ > - pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write); > + pa_assert(!o->audio_inq_rtpoll_item_read); > + pa_assert(!o->control_inq_rtpoll_item_read); > + pa_assert(!o->outq_rtpoll_item_write); > > - o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( > + o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( > i->sink->thread_info.rtpoll, > PA_RTPOLL_LATE, /* This one is not that important, since we check for data in _peek() anyway. */ > - o->inq); > + o->audio_inq); > + > + o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( > + i->sink->thread_info.rtpoll, > + PA_RTPOLL_NORMAL, > + o->control_inq); > > o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( > i->sink->thread_info.rtpoll, > @@ -559,9 +584,14 @@ static void sink_input_detach_cb(pa_sink_input *i) { > * pass any further data to this output */ > pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL); > > - if (o->inq_rtpoll_item_read) { > - pa_rtpoll_item_free(o->inq_rtpoll_item_read); > - o->inq_rtpoll_item_read = NULL; > + if (o->audio_inq_rtpoll_item_read) { > + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read); > + o->audio_inq_rtpoll_item_read = NULL; > + } > + > + if (o->control_inq_rtpoll_item_read) { > + pa_rtpoll_item_free(o->control_inq_rtpoll_item_read); > + o->control_inq_rtpoll_item_read = NULL; > } > > if (o->outq_rtpoll_item_write) { > @@ -756,16 +786,22 @@ static void output_add_within_thread(struct output *o) { > > PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o); > > - pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write); > + pa_assert(!o->outq_rtpoll_item_read); > + pa_assert(!o->audio_inq_rtpoll_item_write); > + pa_assert(!o->control_inq_rtpoll_item_write); > > o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( > o->userdata->rtpoll, > PA_RTPOLL_EARLY-1, /* This item is very important */ > o->outq); > - o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( > + o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( > o->userdata->rtpoll, > PA_RTPOLL_EARLY, > - o->inq); > + o->audio_inq); > + o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( > + o->userdata->rtpoll, > + PA_RTPOLL_NORMAL, > + o->control_inq); > } > > /* Called from thread context of the io thread */ > @@ -780,9 +816,14 @@ static void output_remove_within_thread(struct output *o) { > o->outq_rtpoll_item_read = NULL; > } > > - if (o->inq_rtpoll_item_write) { > - pa_rtpoll_item_free(o->inq_rtpoll_item_write); > - o->inq_rtpoll_item_write = NULL; > + if (o->audio_inq_rtpoll_item_write) { > + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write); > + o->audio_inq_rtpoll_item_write = NULL; > + } > + > + if (o->control_inq_rtpoll_item_write) { > + pa_rtpoll_item_free(o->control_inq_rtpoll_item_write); > + o->control_inq_rtpoll_item_write = NULL; > } > } > > @@ -803,7 +844,8 @@ static void sink_update_requested_latency(pa_sink *s) { > > /* Just hand this one over to all sink_inputs */ > PA_LLIST_FOREACH(o, u->thread_info.active_outputs) { > - pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, u->block_usec, NULL, NULL); > + pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, > + u->block_usec, NULL, NULL); > } > } > > @@ -977,7 +1019,8 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) { > > o = pa_xnew0(struct output, 1); > o->userdata = u; > - o->inq = pa_asyncmsgq_new(0); > + o->audio_inq = pa_asyncmsgq_new(0); > + o->control_inq = pa_asyncmsgq_new(0); > o->outq = pa_asyncmsgq_new(0); > o->sink = sink; > o->memblockq = pa_memblockq_new( > @@ -1004,18 +1047,26 @@ static void output_free(struct output *o) { > output_disable(o); > update_description(o->userdata); > > - if (o->inq_rtpoll_item_read) > - pa_rtpoll_item_free(o->inq_rtpoll_item_read); > - if (o->inq_rtpoll_item_write) > - pa_rtpoll_item_free(o->inq_rtpoll_item_write); > + if (o->audio_inq_rtpoll_item_read) > + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read); > + if (o->audio_inq_rtpoll_item_write) > + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write); > + > + if (o->control_inq_rtpoll_item_read) > + pa_rtpoll_item_free(o->control_inq_rtpoll_item_read); > + if (o->control_inq_rtpoll_item_write) > + pa_rtpoll_item_free(o->control_inq_rtpoll_item_write); > > if (o->outq_rtpoll_item_read) > pa_rtpoll_item_free(o->outq_rtpoll_item_read); > if (o->outq_rtpoll_item_write) > pa_rtpoll_item_free(o->outq_rtpoll_item_write); > > - if (o->inq) > - pa_asyncmsgq_unref(o->inq); > + if (o->audio_inq) > + pa_asyncmsgq_unref(o->audio_inq); > + > + if (o->control_inq) > + pa_asyncmsgq_unref(o->control_inq); > > if (o->outq) > pa_asyncmsgq_unref(o->outq); > @@ -1068,7 +1119,8 @@ static void output_disable(struct output *o) { > > /* Finally, drop all queued data */ > pa_memblockq_flush_write(o->memblockq, true); > - pa_asyncmsgq_flush(o->inq, false); > + pa_asyncmsgq_flush(o->audio_inq, false); > + pa_asyncmsgq_flush(o->control_inq, false); > pa_asyncmsgq_flush(o->outq, false); > } > Hi Tanu, couldn't you just replace the pa_asyncmsgq_post() with pa_asyncmsgq_send() in sink_update_requested_latency()? Then the latency updates would be done synchronously and the situation you describe above should never happen. Regards Georg