The current code does not make any attempt to initialize the end-to-end latency to a value near the desired latency. This leads to underruns at startup because the memblockq is initially empty and to very long adjustment times for long latencies because the end-to-end latency at startup is significantly shorter than the desired value. This patch initializes the memblockq at startup and during source or sink changes so that the end-to-end latency will be near the configured value. It also ensures that there are no underruns if the source is slow to start and that the latency does not grow too much when the sink is slow to start by adjusting the length of the memblockq until the source has called push for the first time and the sink has called pop for the second time. Waiting for the second pop is necessary because the sink has not been started when the first pop is called. For clarity, variables have been separated into input, output and main thread variables. --- src/modules/module-loopback.c | 324 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 290 insertions(+), 34 deletions(-) diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c index 2907bbc..a5705d5 100644 --- a/src/modules/module-loopback.c +++ b/src/modules/module-loopback.c @@ -61,6 +61,8 @@ PA_MODULE_USAGE( #define MEMBLOCKQ_MAXLENGTH (1024*1024*32) +#define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC) + #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC) struct userdata { @@ -78,14 +80,18 @@ struct userdata { pa_time_event *time_event; pa_usec_t adjust_time; - int64_t recv_counter; - int64_t send_counter; - size_t skip; pa_usec_t latency; - bool in_pop; + /* Latency boundaries and current values */ + pa_usec_t min_source_latency; + pa_usec_t max_source_latency; + pa_usec_t min_sink_latency; + pa_usec_t max_sink_latency; + pa_usec_t configured_sink_latency; + pa_usec_t configured_source_latency; + /* Used for sink input and source output snapshots */ struct { int64_t send_counter; pa_usec_t source_latency; @@ -96,6 +102,22 @@ struct userdata { pa_usec_t sink_latency; pa_usec_t sink_timestamp; } latency_snapshot; + + /* Input thread variable */ + int64_t send_counter; + + /* Output thread variables */ + struct { + int64_t recv_counter; + pa_usec_t effective_source_latency; + + /* Various booleans */ + bool in_pop; + bool pop_called; + bool pop_adjust; + bool first_pop_done; + bool push_called; + } output_thread_info; }; static const char* const valid_modargs[] = { @@ -118,11 +140,14 @@ static const char* const valid_modargs[] = { enum { SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX, SINK_INPUT_MESSAGE_REWIND, - SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT + SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, + SINK_INPUT_MESSAGE_SINK_CHANGED, + SINK_INPUT_MESSAGE_SOURCE_CHANGED, + SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY }; enum { - SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX, + SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX }; static void enable_adjust_timer(struct userdata *u, bool enable); @@ -279,10 +304,70 @@ static void update_adjust_timer(struct userdata *u) { enable_adjust_timer(u, true); } +/* Called from main thread + * Calculates minimum and maximum possible latency for source and sink */ +static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) { + + if (source) { + /* Source latencies */ + if (source->flags & PA_SOURCE_DYNAMIC_LATENCY) + pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency); + else { + u->min_source_latency = pa_source_get_fixed_latency(source); + u->max_source_latency = u->min_source_latency; + } + /* Latencies below 2.5 ms cause problems, limit source latency if possible */ + if (u->max_source_latency >= MIN_DEVICE_LATENCY) + u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY); + else + u->min_source_latency = u->max_source_latency; + } + + if (sink) { + /* Sink latencies */ + if (sink->flags & PA_SINK_DYNAMIC_LATENCY) + pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency); + else { + u->min_sink_latency = pa_sink_get_fixed_latency(sink); + u->max_sink_latency = u->min_sink_latency; + } + /* Latencies below 2.5 ms cause problems, limit sink latency if possible */ + if (u->max_sink_latency >= MIN_DEVICE_LATENCY) + u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY); + else + u->min_sink_latency = u->max_sink_latency; + } +} + +/* Called from output context + * Sets the memblockq to the configured latency corrected by latency_offset_usec */ +static void memblockq_adjust(struct userdata *u, pa_usec_t latency_offset_usec, bool allow_push) { + size_t current_memblockq_length, requested_memblockq_length, buffer_correction; + pa_usec_t requested_buffer_latency; + + requested_buffer_latency = PA_CLIP_SUB(u->latency, latency_offset_usec); + requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec); + current_memblockq_length = pa_memblockq_get_length(u->memblockq); + + if (current_memblockq_length > requested_memblockq_length) { + /* Drop audio from queue */ + buffer_correction = current_memblockq_length - requested_memblockq_length; + pa_log_info("Dropping %lu usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec)); + pa_memblockq_drop(u->memblockq, buffer_correction); + + } else if (current_memblockq_length < requested_memblockq_length && allow_push) { + /* Add silence to queue */ + buffer_correction = requested_memblockq_length - current_memblockq_length; + pa_log_info("Adding %lu usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec)); + pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true); + } +} + /* Called from input thread context */ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { struct userdata *u; pa_memchunk copy; + pa_usec_t push_time, current_source_latency; pa_source_output_assert_ref(o); pa_source_output_assert_io_context(o); @@ -302,7 +387,11 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) chunk = © } - pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, chunk, NULL); + /* Send current source latency and timestamp with the message */ + push_time = pa_rtclock_now(); + current_source_latency = pa_source_get_latency_within_thread(u->source_output->source); + + pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_UINT_TO_PTR(current_source_latency), push_time, chunk, NULL); u->send_counter += (int64_t) chunk->length; } @@ -342,6 +431,45 @@ static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, return pa_source_output_process_msg(obj, code, data, offset, chunk); } +/* Called from main thread. + * Get current effective latency of the source. If the source is in use with + * smaller latency than the configured latency, it will continue running with + * the smaller value when the source output is switched to the source. */ +static void get_effective_source_latency(struct userdata *u, pa_source *source, pa_sink *sink) { + pa_usec_t effective_source_latency; + + effective_source_latency = u->configured_source_latency; + + if (source) { + effective_source_latency = pa_source_get_requested_latency(source); + if (effective_source_latency == 0 || effective_source_latency > u->configured_source_latency) + effective_source_latency = u->configured_source_latency; + } + + /* If the sink is valid, send a message to the output thread, else set the variable directly */ + if (sink) + pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effective_source_latency, NULL); + else + u->output_thread_info.effective_source_latency = effective_source_latency; +} + +/* Called from main thread. + * Set source output latency to one third of the overall latency if possible. + * The choice of one third is rather arbitrary somewhere between the minimum + * possible latency (which would cause a lot of CPU load) and half the configured + * latency (which would lead to an empty memblockq if the sink is configured + * likewise). */ +static void set_source_output_latency(struct userdata *u, pa_source *source) { + pa_usec_t latency, requested_latency; + + requested_latency = u->latency / 3; + + latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency); + u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency); + if (u->configured_source_latency != requested_latency) + pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC); +} + /* Called from input thread context */ static void source_output_attach_cb(pa_source_output *o) { struct userdata *u; @@ -435,12 +563,26 @@ static void source_output_moving_cb(pa_source_output *o, pa_source *dest) { if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME))) pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n); - if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) - pa_sink_input_cork(u->sink_input, true); - else + /* Set latency and calculate latency limits */ + update_latency_boundaries(u, dest, NULL); + set_source_output_latency(u, dest); + get_effective_source_latency(u, dest, u->sink_input->sink); + + if (pa_source_get_state(dest) == PA_SOURCE_SUSPENDED) { + if (dest->suspend_cause != PA_SUSPEND_IDLE) + pa_sink_input_cork(u->sink_input, true); + } else pa_sink_input_cork(u->sink_input, false); update_adjust_timer(u); + + /* Send a mesage to the output thread that the source has changed. + * If the sink is invalid here during a profile switching situation + * we can safely set push_called to false directly. */ + if (u->sink_input->sink) + pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL); + else + u->output_thread_info.push_called = false; } /* Called from main thread */ @@ -451,6 +593,18 @@ static void source_output_suspend_cb(pa_source_output *o, bool suspended) { pa_assert_ctl_context(); pa_assert_se(u = o->userdata); + /* If the source output has been suspended, we need to handle this like + * a source change when the source output is resumed */ + if (suspended) { + if (u->sink_input->sink) + pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL); + else + u->output_thread_info.push_called = false; + + } else + /* Get effective source latency on unsuspend */ + get_effective_source_latency(u, u->source_output->source, u->sink_input->sink); + pa_sink_input_cork(u->sink_input, suspended); update_adjust_timer(u); @@ -465,10 +619,22 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk pa_assert_se(u = i->userdata); pa_assert(chunk); - u->in_pop = true; + /* It seems necessary to handle outstanding push messages here, though it is not clear + * why. Removing this part leads to underruns when low latencies are configured. */ + u->output_thread_info.in_pop = true; while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0) ; - u->in_pop = false; + u->output_thread_info.in_pop = false; + + /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are + * enabled. Disable them on second pop and enable the final adjustment during the + * next push. We are waiting for the second pop, because the first pop is called + * before the sink is actually started. */ + if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) { + u->output_thread_info.pop_adjust = true; + u->output_thread_info.pop_called = true; + } + u->output_thread_info.first_pop_done = true; if (pa_memblockq_peek(u->memblockq, chunk) < 0) { pa_log_info("Could not peek into queue"); @@ -478,6 +644,12 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk chunk->length = PA_MIN(chunk->length, nbytes); pa_memblockq_drop(u->memblockq, chunk->length); + /* If push has not been called yet, assume that the source will deliver one full latency + * when it starts pushing. Adjust the memblockq accordingly and ensure that there is + * enough data in the queue to avoid underruns. */ + if (!u->output_thread_info.push_called) + memblockq_adjust(u, u->output_thread_info.effective_source_latency, true); + return 0; } @@ -496,13 +668,13 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK_INPUT(obj)->userdata; + pa_sink_input_assert_io_context(u->sink_input); + switch (code) { case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = data; - pa_sink_input_assert_io_context(u->sink_input); - *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec); /* Fall through, the default handler will add in the extra @@ -512,16 +684,41 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in case SINK_INPUT_MESSAGE_POST: - pa_sink_input_assert_io_context(u->sink_input); + pa_memblockq_push_align(u->memblockq, chunk); + + /* If push has not been called yet, latency adjustments in sink_input_pop_cb() + * are enabled. Disable them on first push and correct the memblockq. Do the + * same if the pop_cb() requested the adjustment */ + if (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust) { + pa_usec_t time_delta; - if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state)) - pa_memblockq_push_align(u->memblockq, chunk); - else - pa_memblockq_flush_write(u->memblockq, true); + time_delta = PA_PTR_TO_UINT(data); + time_delta += pa_rtclock_now() - offset; + time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink); + + /* If the source has overrun, assume that the maximum it should have pushed is + * one full source latency. It may still be possible that the next push also + * contains too much data, then the resulting latency will be wrong. */ + if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency) + time_delta = PA_CLIP_SUB(time_delta, u->output_thread_info.effective_source_latency); + else + time_delta = PA_CLIP_SUB(time_delta, pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec)); + + memblockq_adjust(u, time_delta, true); + + u->output_thread_info.pop_adjust = false; + u->output_thread_info.push_called = true; + } + + /* If pop has not been called yet, make sure the latency does not grow too much. + * Don't push any silence here, because we already have new data in the queue */ + if (!u->output_thread_info.pop_called) + memblockq_adjust(u, pa_sink_get_latency_within_thread(u->sink_input->sink), false); /* Is this the end of an underrun? Then let's start things * right-away */ - if (!u->in_pop && + if (!u->output_thread_info.in_pop && + u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED && u->sink_input->thread_info.underrun_for > 0 && pa_memblockq_is_readable(u->memblockq)) { @@ -531,20 +728,15 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in false, true, false); } - u->recv_counter += (int64_t) chunk->length; + u->output_thread_info.recv_counter += (int64_t) chunk->length; return 0; case SINK_INPUT_MESSAGE_REWIND: - pa_sink_input_assert_io_context(u->sink_input); - - if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state)) - pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true); - else - pa_memblockq_flush_write(u->memblockq, true); + pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true); - u->recv_counter -= offset; + u->output_thread_info.recv_counter -= offset; return 0; @@ -553,7 +745,7 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq); - u->latency_snapshot.recv_counter = u->recv_counter; + u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter; u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq); /* Add content of render memblockq to sink latency */ u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink) + @@ -562,10 +754,45 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in return 0; } + + case SINK_INPUT_MESSAGE_SOURCE_CHANGED: + + u->output_thread_info.push_called = false; + + return 0; + + case SINK_INPUT_MESSAGE_SINK_CHANGED: + + u->output_thread_info.pop_called = false; + u->output_thread_info.first_pop_done = false; + + return 0; + + case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY: + + u->output_thread_info.effective_source_latency = (pa_usec_t)offset; + + return 0; } return pa_sink_input_process_msg(obj, code, data, offset, chunk); } +/* Called from main thread. + * Set sink input latency to one third of the overall latency if possible. + * The choice of one third is rather arbitrary somewhere between the minimum + * possible latency (which would cause a lot of CPU load) and half the configured + * latency (which would lead to an empty memblockq if the source is configured + * likewise). */ +static void set_sink_input_latency(struct userdata *u, pa_sink *sink) { + pa_usec_t latency, requested_latency; + + requested_latency = u->latency / 3; + + latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency); + u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency); + if (u->configured_sink_latency != requested_latency) + pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC); +} /* Called from output thread context */ static void sink_input_attach_cb(pa_sink_input *i) { @@ -665,12 +892,21 @@ static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) { if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME))) pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n); - if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) - pa_source_output_cork(u->source_output, true); - else + /* Set latency and calculate latency limits */ + update_latency_boundaries(u, NULL, dest); + set_sink_input_latency(u, dest); + get_effective_source_latency(u, u->source_output->source, dest); + + if (pa_sink_get_state(dest) == PA_SINK_SUSPENDED) { + if (dest->suspend_cause != PA_SUSPEND_IDLE) + pa_source_output_cork(u->source_output, true); + } else pa_source_output_cork(u->source_output, false); update_adjust_timer(u); + + /* Send a message to the output thread that the sink has changed */ + pa_asyncmsgq_send(dest->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SINK_CHANGED, NULL, 0, NULL); } /* Called from main thread */ @@ -695,6 +931,16 @@ static void sink_input_suspend_cb(pa_sink_input *i, bool suspended) { pa_assert_ctl_context(); pa_assert_se(u = i->userdata); + /* If the sink input has been suspended, we need to handle this like + * a sink change when the sink input is resumed. Because the sink input + * is suspended, we can set the variables directly. */ + if (suspended) { + u->output_thread_info.pop_called = false; + u->output_thread_info.first_pop_done = false; + } else + /* Set effective source latency on unsuspend */ + get_effective_source_latency(u, u->source_output->source, u->sink_input->sink); + pa_source_output_cork(u->source_output, suspended); update_adjust_timer(u); @@ -798,6 +1044,9 @@ int pa__init(pa_module *m) { u->core = m->core; u->module = m; u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC; + u->output_thread_info.pop_called = false; + u->output_thread_info.pop_adjust = false; + u->output_thread_info.push_called = false; adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC; if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) { @@ -876,7 +1125,8 @@ int pa__init(pa_module *m) { u->sink_input->suspend = sink_input_suspend_cb; u->sink_input->userdata = u; - pa_sink_input_set_requested_latency(u->sink_input, u->latency/3); + update_latency_boundaries(u, NULL, u->sink_input->sink); + set_sink_input_latency(u, u->sink_input->sink); pa_source_output_new_data_init(&source_output_data); source_output_data.driver = __FILE__; @@ -927,7 +1177,8 @@ int pa__init(pa_module *m) { u->source_output->suspend = source_output_suspend_cb; u->source_output->userdata = u; - pa_source_output_set_requested_latency(u->source_output, u->latency/3); + update_latency_boundaries(u, u->source_output->source, u->sink_input->sink); + set_source_output_latency(u, u->source_output->source); pa_sink_input_get_silence(u->sink_input, &silence); u->memblockq = pa_memblockq_new( @@ -941,6 +1192,8 @@ int pa__init(pa_module *m) { 0, /* maxrewind */ &silence); /* silence frame */ pa_memblock_unref(silence.memblock); + /* Fill the memblockq with silence */ + pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true); u->asyncmsgq = pa_asyncmsgq_new(0); if (!u->asyncmsgq) { @@ -964,6 +1217,9 @@ int pa__init(pa_module *m) { && (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME))) pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n); + /* The output thread is not yet running, set effective_source_latency directly */ + get_effective_source_latency(u, u->source_output->source, NULL); + pa_sink_input_put(u->sink_input); pa_source_output_put(u->source_output); -- 2.10.1