The current code does not make any attempt to initialize the end-to-end latency to a value near the desired latency. This leads to underruns at startup because the memblockq is initially empty and to very long adjustment times for long latencies because the end-to-end latency at startup is significantly shorter than the desired value. This patch initializes the memblockq at startup and during source or sink changes so that the end-to-end latency will be near the configured value. It also ensures that there are no underruns if the source is slow to start and that the latency does not grow too much when the sink is slow to start by adjusting the length of the memblockq until the source has called push for the first time and the sink has called pop for the second time. Waiting for the second pop is necessary because the sink has not been started when the first pop is called. --- src/modules/module-loopback.c | 317 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 295 insertions(+), 22 deletions(-) diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c index 2907bbc..7a0574c 100644 --- a/src/modules/module-loopback.c +++ b/src/modules/module-loopback.c @@ -61,6 +61,8 @@ PA_MODULE_USAGE( #define MEMBLOCKQ_MAXLENGTH (1024*1024*32) +#define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC) + #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC) struct userdata { @@ -82,9 +84,24 @@ struct userdata { int64_t send_counter; size_t skip; + size_t first_chunk_length; pa_usec_t latency; + /* Latency boundaries and current values */ + pa_usec_t min_source_latency; + pa_usec_t max_source_latency; + pa_usec_t min_sink_latency; + pa_usec_t max_sink_latency; + pa_usec_t configured_sink_latency; + pa_usec_t configured_source_latency; + pa_usec_t effective_source_latency; + + /* Various booleans */ bool in_pop; + bool pop_called; + bool first_pop_done; + bool push_called; + bool push_called_in_thread; struct { int64_t send_counter; @@ -96,6 +113,16 @@ struct userdata { pa_usec_t sink_latency; pa_usec_t sink_timestamp; } latency_snapshot; + + /* This struct is filled by the input thread during + * the initial push and/or pop */ + struct { + pa_usec_t push_source_latency; + pa_usec_t source_timestamp; + + pa_usec_t pop_source_latency; + } source_snapshot; + }; static const char* const valid_modargs[] = { @@ -118,11 +145,15 @@ static const char* const valid_modargs[] = { enum { SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX, SINK_INPUT_MESSAGE_REWIND, - SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT + SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, + SINK_INPUT_MESSAGE_SINK_CHANGED, + SINK_INPUT_MESSAGE_SOURCE_CHANGED }; enum { SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX, + SOURCE_OUTPUT_MESSAGE_SOURCE_CHANGED, + SOURCE_OUTPUT_MESSAGE_GET_LATENCY }; static void enable_adjust_timer(struct userdata *u, bool enable); @@ -279,6 +310,65 @@ static void update_adjust_timer(struct userdata *u) { enable_adjust_timer(u, true); } +/* Called from main thread + * Calculates minimum and maximum possible latency for source and sink */ +static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) { + + if (source) { + /* Source latencies */ + if (source->flags & PA_SOURCE_DYNAMIC_LATENCY) + pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency); + else { + u->min_source_latency = pa_source_get_fixed_latency(source); + u->max_source_latency = u->min_source_latency; + } + /* Latencies below 2.5 ms cause problems, limit source latency if possible */ + if (u->max_source_latency >= MIN_DEVICE_LATENCY) + u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY); + else + u->min_source_latency = u->max_source_latency; + } + + if (sink) { + /* Sink latencies */ + if (sink->flags & PA_SINK_DYNAMIC_LATENCY) + pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency); + else { + u->min_sink_latency = pa_sink_get_fixed_latency(sink); + u->max_sink_latency = u->min_sink_latency; + } + /* Latencies below 2.5 ms cause problems, limit sink latency if possible */ + if (u->max_sink_latency >= MIN_DEVICE_LATENCY) + u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY); + else + u->min_sink_latency = u->max_sink_latency; + } +} + +/* Called from output context + * Sets the memblockq to the configured latency corrected by latency_offset_usec */ +static void memblockq_adjust(struct userdata *u, pa_usec_t latency_offset_usec, bool allow_push) { + size_t current_memblockq_length, requested_memblockq_length, buffer_correction; + pa_usec_t requested_buffer_latency; + + requested_buffer_latency = PA_CLIP_SUB(u->latency, latency_offset_usec); + requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec); + current_memblockq_length = pa_memblockq_get_length(u->memblockq); + + if (current_memblockq_length > requested_memblockq_length) { + /* Drop audio from queue */ + buffer_correction = current_memblockq_length - requested_memblockq_length; + pa_log_info("Dropping %lu usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec)); + pa_memblockq_drop(u->memblockq, buffer_correction); + + } else if (current_memblockq_length < requested_memblockq_length && allow_push) { + /* Add silence to queue */ + buffer_correction = requested_memblockq_length - current_memblockq_length; + pa_log_info("Adding %lu usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec)); + pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true); + } +} + /* Called from input thread context */ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { struct userdata *u; @@ -302,6 +392,12 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) chunk = © } + if (!u->push_called_in_thread) { + u->source_snapshot.source_timestamp = pa_rtclock_now(); + u->source_snapshot.push_source_latency = pa_source_get_latency_within_thread(u->source_output->source); + u->push_called_in_thread = true; + } + pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, chunk, NULL); u->send_counter += (int64_t) chunk->length; } @@ -337,11 +433,55 @@ static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, return 0; } + + case SOURCE_OUTPUT_MESSAGE_SOURCE_CHANGED: + + u->push_called_in_thread = false; + + return 0; + + case SOURCE_OUTPUT_MESSAGE_GET_LATENCY: + + u->source_snapshot.pop_source_latency = pa_source_get_latency_within_thread(u->source_output->source); + + return 0; } return pa_source_output_process_msg(obj, code, data, offset, chunk); } +/* Called from main thread. + * Get current effective latency of the source. If the source is in use with + * smaller latency than the configured latency, it will continue running with + * the smaller value when the source output is switched to the source. */ +static void get_effective_source_latency(struct userdata *u, pa_source *source) { + + if (!source) { + u->effective_source_latency = u->configured_source_latency; + return; + } + u->effective_source_latency = pa_source_get_requested_latency(source); + if (u->effective_source_latency == 0 || u->effective_source_latency > u->configured_source_latency) + u->effective_source_latency = u->configured_source_latency; +} + +/* Called from main thread. + * Set source output latency to one third of the overall latency if possible. + * The choice of one third is rather arbitrary somewhere between the minimum + * possible latency (which would cause a lot of CPU load) and half the configured + * latency (which would lead to an empty memblockq if the sink is configured + * likewise). */ +static void set_source_output_latency(struct userdata *u, pa_source *source) { + pa_usec_t latency, requested_latency; + + requested_latency = u->latency / 3; + + latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency); + u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency); + if (u->configured_source_latency != requested_latency) + pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC); +} + /* Called from input thread context */ static void source_output_attach_cb(pa_source_output *o) { struct userdata *u; @@ -435,12 +575,28 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) { if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME))) pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n); - if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) - pa_sink_input_cork(u->sink_input, true); - else + /* Set latency and calculate latency limits */ + update_latency_boundaries(u, dest, NULL); + set_source_output_latency(u, dest); + get_effective_source_latency(u, dest); + + if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) { + if (dest->suspend_cause != PA_SUSPEND_IDLE) + pa_sink_input_cork(u->sink_input, true); + } else pa_sink_input_cork(u->sink_input, false); update_adjust_timer(u); + + /* Send a mesage to the output thread that the source has changed. + * If the sink is invalid here during a profile switching situation + * we can safely set push_called to false directly. */ + if (u->sink_input->sink) + pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL); + else + u->push_called = false; + + pa_asyncmsgq_send(dest->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL); } /* Called from main thread */ @@ -451,6 +607,21 @@ static void source_output_suspend_cb(pa_source_output *o, bool suspended) { pa_assert_ctl_context(); pa_assert_se(u = o->userdata); + /* If the source output has been suspended, we need to handle this like + * a source change when the source output is resumed */ + if (suspended) { + if (u->sink_input->sink) + pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL); + else + u->push_called = false; + + /* Since the source is suspended, we can set push_called_in_thread directly */ + u->push_called_in_thread = false; + + } else + /* Get effective source latency on unsuspend */ + get_effective_source_latency(u, u->source_output->source); + pa_sink_input_cork(u->sink_input, suspended); update_adjust_timer(u); @@ -465,11 +636,35 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk pa_assert_se(u = i->userdata); pa_assert(chunk); + /* It seems necessary to handle outstanding push messages here, though it is not clear + * why. Removing this part leads to underruns when low latencies are configured. */ u->in_pop = true; while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0) ; u->in_pop = false; + /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are + * enabled. Disable them on second pop. We are waiting for the second pop, because + * the first pop is called before the sink is actually started. */ + if (!u->pop_called && u->first_pop_done) { + pa_usec_t time_delta; + + /* If the source is valid, get the latency. Else we can safely set pop_source_latency + * to zero, it does not matter since the final adjustment will be done when the source + * becomes valid */ + if (u->source_output->source) + pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_GET_LATENCY, NULL, 0, NULL); + else + u->source_snapshot.pop_source_latency = 0; + + time_delta = u->source_snapshot.pop_source_latency; + time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink); + time_delta += pa_bytes_to_usec(u->first_chunk_length, &u->sink_input->sample_spec); + + memblockq_adjust(u, time_delta, true); + u->pop_called = true; + } + if (pa_memblockq_peek(u->memblockq, chunk) < 0) { pa_log_info("Could not peek into queue"); return -1; @@ -478,6 +673,18 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk chunk->length = PA_MIN(chunk->length, nbytes); pa_memblockq_drop(u->memblockq, chunk->length); + /* We need to know on the second pop how much data was already dropped during the first pop */ + if (!u->first_pop_done) { + u->first_chunk_length = chunk->length; + u->first_pop_done = true; + } + + /* If push has not been called yet, assume that the source will deliver one full latency + * when it starts pushing. Adjust the memblockq accordingly and ensure that there is + * enough data in the queue to avoid underruns. */ + if (!u->push_called) + memblockq_adjust(u, u->effective_source_latency, true); + return 0; } @@ -496,13 +703,13 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK_INPUT(obj)->userdata; + pa_sink_input_assert_io_context(u->sink_input); + switch (code) { case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = data; - pa_sink_input_assert_io_context(u->sink_input); - *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec); /* Fall through, the default handler will add in the extra @@ -512,16 +719,30 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in case SINK_INPUT_MESSAGE_POST: - pa_sink_input_assert_io_context(u->sink_input); + /* If push has not been called yet, latency adjustments in sink_input_pop_cb() + * are enabled. Disable them on first push. Also correct the memblockq */ + if (!u->push_called) { + pa_usec_t time_delta; + + time_delta = u->source_snapshot.push_source_latency; + time_delta += pa_rtclock_now() - u->source_snapshot.source_timestamp; + time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink); + + memblockq_adjust(u, time_delta, true); + u->push_called = true; + } + + pa_memblockq_push_align(u->memblockq, chunk); - if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state)) - pa_memblockq_push_align(u->memblockq, chunk); - else - pa_memblockq_flush_write(u->memblockq, true); + /* If pop has not been called yet, make sure the latency does not grow too much. + * Don't push any silence here, because we already have new data in the queue */ + if (!u->pop_called) + memblockq_adjust(u, pa_sink_get_latency_within_thread(u->sink_input->sink), false); /* Is this the end of an underrun? Then let's start things * right-away */ if (!u->in_pop && + pa_sink_get_state(u->sink_input->sink) != PA_SINK_SUSPENDED && u->sink_input->thread_info.underrun_for > 0 && pa_memblockq_is_readable(u->memblockq)) { @@ -537,12 +758,7 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in case SINK_INPUT_MESSAGE_REWIND: - pa_sink_input_assert_io_context(u->sink_input); - - if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state)) - pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true); - else - pa_memblockq_flush_write(u->memblockq, true); + pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true); u->recv_counter -= offset; @@ -562,10 +778,39 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in return 0; } + + case SINK_INPUT_MESSAGE_SOURCE_CHANGED: + + u->push_called = false; + + return 0; + + case SINK_INPUT_MESSAGE_SINK_CHANGED: + + u->pop_called = false; + u->first_pop_done = false; + + return 0; } return pa_sink_input_process_msg(obj, code, data, offset, chunk); } +/* Called from main thread. + * Set sink input latency to one third of the overall latency if possible. + * The choice of one third is rather arbitrary somewhere between the minimum + * possible latency (which would cause a lot of CPU load) and half the configured + * latency (which would lead to an empty memblockq if the source is configured + * likewise). */ +static void set_sink_input_latency(struct userdata *u, pa_sink *sink) { + pa_usec_t latency, requested_latency; + + requested_latency = u->latency / 3; + + latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency); + u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency); + if (u->configured_sink_latency != requested_latency) + pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC); +} /* Called from output thread context */ static void sink_input_attach_cb(pa_sink_input *i) { @@ -665,12 +910,21 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) { if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME))) pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n); - if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) - pa_source_output_cork(u->source_output, true); - else + /* Set latency and calculate latency limits */ + update_latency_boundaries(u, NULL, dest); + set_sink_input_latency(u, dest); + get_effective_source_latency(u, u->source_output->source); + + if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) { + if (dest->suspend_cause != PA_SUSPEND_IDLE) + pa_source_output_cork(u->source_output, true); + } else pa_source_output_cork(u->source_output, false); update_adjust_timer(u); + + /* Send a message to the output thread that the sink has changed */ + pa_asyncmsgq_send(dest->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SINK_CHANGED, NULL, 0, NULL); } /* Called from main thread */ @@ -695,6 +949,16 @@ static void sink_input_suspend_cb(pa_sink_input *i, bool suspended) { pa_assert_ctl_context(); pa_assert_se(u = i->userdata); + /* If the sink input has been suspended, we need to handle this like + * a sink change when the sink input is resumed. Because the sink input + * is suspended, we can set the variables directly. */ + if (suspended) { + u->pop_called = false; + u->first_pop_done = false; + } else + /* Set effective source latency on unsuspend */ + get_effective_source_latency(u, u->source_output->source); + pa_source_output_cork(u->source_output, suspended); update_adjust_timer(u); @@ -798,6 +1062,9 @@ int pa__init(pa_module *m) { u->core = m->core; u->module = m; u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC; + u->pop_called = false; + u->push_called = false; + u->push_called_in_thread = false; adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC; if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) { @@ -876,7 +1143,8 @@ int pa__init(pa_module *m) { u->sink_input->suspend = sink_input_suspend_cb; u->sink_input->userdata = u; - pa_sink_input_set_requested_latency(u->sink_input, u->latency/3); + update_latency_boundaries(u, NULL, u->sink_input->sink); + set_sink_input_latency(u, u->sink_input->sink); pa_source_output_new_data_init(&source_output_data); source_output_data.driver = __FILE__; @@ -927,7 +1195,8 @@ int pa__init(pa_module *m) { u->source_output->suspend = source_output_suspend_cb; u->source_output->userdata = u; - pa_source_output_set_requested_latency(u->source_output, u->latency/3); + update_latency_boundaries(u, u->source_output->source, u->sink_input->sink); + set_source_output_latency(u, u->source_output->source); pa_sink_input_get_silence(u->sink_input, &silence); u->memblockq = pa_memblockq_new( @@ -941,6 +1210,8 @@ int pa__init(pa_module *m) { 0, /* maxrewind */ &silence); /* silence frame */ pa_memblock_unref(silence.memblock); + /* Fill the memblockq with silence */ + pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true); u->asyncmsgq = pa_asyncmsgq_new(0); if (!u->asyncmsgq) { @@ -964,6 +1235,8 @@ int pa__init(pa_module *m) { && (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME))) pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n); + get_effective_source_latency(u, u->source_output->source); + pa_sink_input_put(u->sink_input); pa_source_output_put(u->source_output); -- 2.10.1