The corking logic of module-loopback was incorrectly implemented. If you suspended the source, the sink input would be corked. When then the sink was suspended because of being idle, the source output was also corked. If you moved then away from the suspended source, module-loopback would not start the stream again, because sink input and source output were both corked. The same applied if the sink was suspended. This patch corrects this behavior. It also uncorks sink input or source output if the destination source or sink during a move is suspended because it is idle. This avoids unnecessary interruptions of the stream, which makes the latency reports used to correct the initial latency more reliable. The patch also takes profile switches into account, where sink and source become invalid at the same time. In this case, corking or uncorking must be delayed until the appropriate attach callback. --- src/modules/module-loopback.c | 98 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 17 deletions(-) diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c index 8cbb05e..6cd7301 100644 --- a/src/modules/module-loopback.c +++ b/src/modules/module-loopback.c @@ -103,8 +103,13 @@ struct userdata { pa_usec_t sink_timestamp; } latency_snapshot; - /* Input thread variable */ - int64_t send_counter; + /* Input thread variables */ + struct { + int64_t send_counter; + + bool delayed_source_output_should_cork; + bool delayed_source_output_state_change_required; + } input_thread_info; /* Output thread variables */ struct { @@ -117,6 +122,8 @@ struct userdata { bool pop_adjust; bool first_pop_done; bool push_called; + bool delayed_sink_input_should_cork; + bool delayed_sink_input_state_change_required; } output_thread_info; }; @@ -391,7 +398,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) current_source_latency = pa_source_get_latency_within_thread(u->source_output->source); pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_UINT_TO_PTR(current_source_latency), push_time, chunk, NULL); - u->send_counter += (int64_t) chunk->length; + u->input_thread_info.send_counter += (int64_t) chunk->length; } /* Called from input thread context */ @@ -403,7 +410,7 @@ static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) pa_assert_se(u = o->userdata); pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL); - u->send_counter -= (int64_t) nbytes; + u->input_thread_info.send_counter -= (int64_t) nbytes; } /* Called from input thread context */ @@ -417,7 +424,7 @@ static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq); - u->latency_snapshot.send_counter = u->send_counter; + u->latency_snapshot.send_counter = u->input_thread_info.send_counter; /* Add content of delay memblockq to the source latency */ u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source) + pa_bytes_to_usec(length, &u->source_output->source->sample_spec); @@ -480,6 +487,15 @@ static void source_output_attach_cb(pa_source_output *o) { o->source->thread_info.rtpoll, PA_RTPOLL_LATE, u->asyncmsgq); + + /* Delayed state schange if necessary */ + if (u->input_thread_info.delayed_source_output_state_change_required) { + u->input_thread_info.delayed_source_output_state_change_required = false; + if (u->input_thread_info.delayed_source_output_should_cork) + pa_source_output_set_state_within_thread(o, PA_SOURCE_OUTPUT_CORKED); + else + pa_source_output_set_state_within_thread(o, PA_SOURCE_OUTPUT_RUNNING); + } } /* Called from input thread context */ @@ -566,10 +582,20 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) { set_source_output_latency(u, dest); update_effective_source_latency(u, dest, u->sink_input->sink); - if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) - pa_sink_input_cork(u->sink_input, true); - else - pa_sink_input_cork(u->sink_input, false); + /* Cork the sink input if the destination is suspended for other + * reasons than idle. If the sink is not valid, delay corking change + * until the sink is attached again */ + if (u->sink_input->sink) { + if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) + pa_sink_input_cork(u->sink_input, (dest->suspend_cause != PA_SUSPEND_IDLE)); + else + pa_sink_input_cork(u->sink_input, false); + } else { + /* If the sink is invalid, we have to delay the corking change until the sink input attach callback. + * This should only happen during a profile switch which involves source and sink */ + u->output_thread_info.delayed_sink_input_should_cork = (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) && (dest->suspend_cause != PA_SUSPEND_IDLE); + u->output_thread_info.delayed_sink_input_state_change_required = true; + } update_adjust_timer(u); @@ -598,11 +624,20 @@ static void source_output_suspend_cb(pa_source_output *o, bool suspended) { else u->output_thread_info.push_called = false; - } else + /* Do not cork the sink input if the source is suspended + * because the sink was suspended and the source output + * corked previously */ + if (pa_source_output_get_state(u->source_output) != PA_SOURCE_OUTPUT_CORKED) + pa_sink_input_cork(u->sink_input, true); + + } else { /* Get effective source latency on unsuspend */ update_effective_source_latency(u, u->source_output->source, u->sink_input->sink); - pa_sink_input_cork(u->sink_input, suspended); + /* Only uncork the sink input, if the sink is valid */ + if (u->sink_input->sink) + pa_sink_input_cork(u->sink_input, false); + } update_adjust_timer(u); } @@ -808,6 +843,15 @@ static void sink_input_attach_cb(pa_sink_input *i) { pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2); pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i)); + + /* Delayed state schange if necessary */ + if (u->output_thread_info.delayed_sink_input_state_change_required) { + u->output_thread_info.delayed_sink_input_state_change_required = false; + if (u->output_thread_info.delayed_sink_input_should_cork) + pa_sink_input_set_state_within_thread(i, PA_SINK_INPUT_CORKED); + else + pa_sink_input_set_state_within_thread(i, PA_SINK_INPUT_RUNNING); + } } /* Called from output thread context */ @@ -896,10 +940,19 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) { set_sink_input_latency(u, dest); update_effective_source_latency(u, u->source_output->source, dest); - if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) - pa_source_output_cork(u->source_output, true); - else - pa_source_output_cork(u->source_output, false); + /* Cork the source output if the destination is suspended for other + * reasons than idle */ + if (u->source_output->source) { + if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) + pa_source_output_cork(u->source_output, (dest->suspend_cause != PA_SUSPEND_IDLE)); + else + pa_source_output_cork(u->source_output, false); + } else { + /* If the source is invalid, we have to delay the corking change until the source output attach callback. + * This should only happen during a profile switch which involves source and sink */ + u->input_thread_info.delayed_source_output_should_cork = (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) && (dest->suspend_cause != PA_SUSPEND_IDLE); + u->input_thread_info.delayed_source_output_state_change_required = true; + } update_adjust_timer(u); @@ -936,11 +989,20 @@ static void sink_input_suspend_cb(pa_sink_input *i, bool suspended) { u->output_thread_info.pop_called = false; u->output_thread_info.first_pop_done = false; - } else + /* Do not cork the source output if the sink is suspended + * because the source was suspended and the sink input + * corked previously */ + if (pa_sink_input_get_state(u->sink_input) != PA_SINK_INPUT_CORKED) + pa_source_output_cork(u->source_output, true); + + } else { /* Set effective source latency on unsuspend */ update_effective_source_latency(u, u->source_output->source, u->sink_input->sink); - pa_source_output_cork(u->source_output, suspended); + /* Only uncork the source output if the source is valid */ + if (u->source_output->source) + pa_source_output_cork(u->source_output, false); + } update_adjust_timer(u); } @@ -1046,6 +1108,8 @@ int pa__init(pa_module *m) { u->output_thread_info.pop_called = false; u->output_thread_info.pop_adjust = false; u->output_thread_info.push_called = false; + u->output_thread_info.delayed_sink_input_state_change_required = false; + u->input_thread_info.delayed_source_output_state_change_required = false; adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC; if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) { -- 2.10.1