This patch makes module loopback adjust to the requested latency if possible by tweaking the rate adjustment. On startup the latency is initialized to a value near the requested latency so that the stable state can be reached within a few iterations of adjust_time. In the case that the requested latency is smaller than source latency + sink latency + 25ms, module loopback will try to adjust the buffer latency to 25ms. (This value could also be made user configurable if necessary) --- src/modules/module-loopback.c | 150 +++++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 76 deletions(-) diff --git a/src/modules/module-loopback.c b/src/modules/module-loopback.c index b3b9557..aff42ab 100644 --- a/src/modules/module-loopback.c +++ b/src/modules/module-loopback.c @@ -65,6 +65,8 @@ PA_MODULE_USAGE( #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC) +#define MIN_BUFFER_USEC (25*PA_USEC_PER_MSEC) + struct userdata { pa_core *core; pa_module *module; @@ -87,7 +89,7 @@ struct userdata { pa_usec_t latency; bool in_pop; - size_t min_memblockq_length; + bool pop_called; struct { int64_t send_counter; @@ -98,8 +100,7 @@ struct userdata { size_t sink_input_buffer; pa_usec_t sink_latency; - size_t min_memblockq_length; - size_t max_request; + pa_usec_t buffer_latency; } latency_snapshot; }; @@ -124,7 +125,6 @@ enum { SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX, SINK_INPUT_MESSAGE_REWIND, SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, - SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED }; enum { @@ -171,9 +171,10 @@ static void teardown(struct userdata *u) { /* Called from main context */ static void adjust_rates(struct userdata *u) { - size_t buffer, fs; + size_t buffer; uint32_t old_rate, base_rate, new_rate; - pa_usec_t buffer_latency; + pa_usec_t req_buffer_latency, final_latency; + double step_size; pa_assert(u); pa_assert_ctl_context(); @@ -188,40 +189,40 @@ static void adjust_rates(struct userdata *u) { if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter) buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter); else - buffer += PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter)); + buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter)); - buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec); + u->latency_snapshot.buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec); pa_log_debug("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms", (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, - (double) buffer_latency / PA_USEC_PER_MSEC, + (double) u->latency_snapshot.buffer_latency / PA_USEC_PER_MSEC, (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC, - ((double) u->latency_snapshot.sink_latency + buffer_latency + u->latency_snapshot.source_latency) / PA_USEC_PER_MSEC); - - pa_log_debug("Should buffer %zu bytes, buffered at minimum %zu bytes", - u->latency_snapshot.max_request*2, - u->latency_snapshot.min_memblockq_length); + ((double) u->latency_snapshot.sink_latency + u->latency_snapshot.buffer_latency + u->latency_snapshot.source_latency) / PA_USEC_PER_MSEC); - fs = pa_frame_size(&u->sink_input->sample_spec); old_rate = u->sink_input->sample_spec.rate; base_rate = u->source_output->sample_spec.rate; - if (u->latency_snapshot.min_memblockq_length < u->latency_snapshot.max_request*2) - new_rate = base_rate - (((u->latency_snapshot.max_request*2 - u->latency_snapshot.min_memblockq_length) / fs) *PA_USEC_PER_SEC)/u->adjust_time; - else - new_rate = base_rate + (((u->latency_snapshot.min_memblockq_length - u->latency_snapshot.max_request*2) / fs) *PA_USEC_PER_SEC)/u->adjust_time; + final_latency = PA_MAX(u->latency, u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency + MIN_BUFFER_USEC); + req_buffer_latency = final_latency - u->latency_snapshot.sink_latency - u->latency_snapshot.source_latency; + new_rate = base_rate * 1.01; + if ((int32_t)(u->adjust_time + req_buffer_latency - u->latency_snapshot.buffer_latency) > 0) + new_rate = base_rate * u->adjust_time/(u->adjust_time + req_buffer_latency - u->latency_snapshot.buffer_latency); - if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) { - pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate); - new_rate = base_rate; - } else { - if (base_rate < new_rate + 20 && new_rate < base_rate + 20) - new_rate = base_rate; - /* Do the adjustment in small steps; 2â?° can be considered inaudible */ - if (new_rate < (uint32_t) (old_rate*0.998) || new_rate > (uint32_t) (old_rate*1.002)) { - pa_log_info("New rate of %u Hz not within 2â?° of %u Hz, forcing smaller adjustment", new_rate, old_rate); - new_rate = PA_CLAMP(new_rate, (uint32_t) (old_rate*0.998), (uint32_t) (old_rate*1.002)); - } + if (base_rate < new_rate + 10 && new_rate < base_rate + 10) + new_rate = base_rate; + + /* Do not deviate more than between 1% from the base sample rate */ + if (new_rate < (uint32_t) (base_rate*0.99) || new_rate > (uint32_t) (base_rate*1.01)) { + pa_log_info("Sample rates too different (%u vs. %u), limiting rate adjustment.", base_rate, new_rate); + new_rate = PA_CLAMP(new_rate, (uint32_t) (base_rate*0.99), (uint32_t) (base_rate*1.01)); + } + /* Do the adjustment in small steps; 2-2.5â?° can be considered inaudible */ + step_size = 0.002; + if (abs((int32_t)(new_rate - base_rate)) < abs((int32_t)(old_rate - base_rate))) + step_size = 0.0025; + if (new_rate < (uint32_t) (old_rate*(1-step_size)) || new_rate > (uint32_t) (old_rate*(1+step_size))) { + pa_log_info("New rate of %u Hz not within %0.1fâ?° of %u Hz, forcing smaller adjustment", new_rate, step_size * 1000, old_rate); + new_rate = PA_CLAMP(new_rate, (uint32_t) (old_rate*(1-step_size)), (uint32_t) (old_rate*(1+step_size))); } pa_sink_input_set_rate(u->sink_input, new_rate); @@ -442,20 +443,6 @@ static void source_output_suspend_cb(pa_source_output *o, bool suspended) { } /* Called from output thread context */ -static void update_min_memblockq_length(struct userdata *u) { - size_t length; - - pa_assert(u); - pa_sink_input_assert_io_context(u->sink_input); - - length = pa_memblockq_get_length(u->memblockq); - - if (u->min_memblockq_length == (size_t) -1 || - length < u->min_memblockq_length) - u->min_memblockq_length = length; -} - -/* Called from output thread context */ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) { struct userdata *u; @@ -464,6 +451,8 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk pa_assert_se(u = i->userdata); pa_assert(chunk); + if (PA_SINK_IS_RUNNING(u->sink_input->sink->thread_info.state) && !u->pop_called) + u->pop_called = true; u->in_pop = true; while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0) ; @@ -477,8 +466,6 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk chunk->length = PA_MIN(chunk->length, nbytes); pa_memblockq_drop(u->memblockq, chunk->length); - update_min_memblockq_length(u); - return 0; } @@ -494,6 +481,28 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { } /* Called from output thread context */ +static void memblockq_adjust(struct userdata *u, size_t length) { + int32_t latency_diff; + uint32_t latency; + size_t nbytes; + + if (length) { + nbytes = PA_MIN(length, pa_memblockq_get_length(u->memblockq)); + pa_log_debug("Dropping %u Bytes from queue", (uint32_t) nbytes); + pa_memblockq_drop(u->memblockq, nbytes); + return; + } + latency = PA_MAX(u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency + MIN_BUFFER_USEC, u->latency); + latency_diff = u->latency_snapshot.sink_latency + u->latency_snapshot.buffer_latency + u->latency_snapshot.source_latency - latency; + if (latency_diff > 0) { + nbytes = PA_MIN(pa_usec_to_bytes(latency_diff, &u->sink_input->sample_spec), pa_memblockq_get_length(u->memblockq)); + pa_log_debug("Dropping %u Bytes from queue", (uint32_t) nbytes); + pa_memblockq_drop(u->memblockq, nbytes); +/* u->latency_snapshot.buffer_latency = latency - u->latency_snapshot.source_latency - u->latency_snapshot.sink_latency; */ + } +} + +/* Called from output thread context */ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK_INPUT(obj)->userdata; @@ -515,12 +524,12 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in pa_sink_input_assert_io_context(u->sink_input); - if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state)) + if (PA_SINK_IS_RUNNING(u->sink_input->sink->thread_info.state) && u->pop_called) pa_memblockq_push_align(u->memblockq, chunk); - else - pa_memblockq_flush_write(u->memblockq, true); - - update_min_memblockq_length(u); + else { + pa_memblockq_push_align(u->memblockq, chunk); + memblockq_adjust(u, chunk->length); + } /* Is this the end of an underrun? Then let's start things * right-away */ @@ -542,22 +551,18 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in pa_sink_input_assert_io_context(u->sink_input); - if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state)) + if (PA_SINK_IS_RUNNING(u->sink_input->sink->thread_info.state) && u->pop_called) pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true); - else - pa_memblockq_flush_write(u->memblockq, true); + else if (u->latency_snapshot.buffer_latency) + memblockq_adjust(u, 0); u->recv_counter -= offset; - update_min_memblockq_length(u); - return 0; case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: { size_t length; - update_min_memblockq_length(u); - length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq); u->latency_snapshot.recv_counter = u->recv_counter; @@ -566,25 +571,9 @@ static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, in (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, length) : length); u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink); - u->latency_snapshot.max_request = pa_sink_input_get_max_request(u->sink_input); - - u->latency_snapshot.min_memblockq_length = u->min_memblockq_length; - u->min_memblockq_length = (size_t) -1; - return 0; } - case SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED: { - /* This message is sent from the IO thread to the main - * thread! So don't be confused. All the user cases above - * are executed in thread context, but this one is not! */ - - pa_assert_ctl_context(); - - if (u->time_event) - adjust_rates(u); - return 0; - } } return pa_sink_input_process_msg(obj, code, data, offset, chunk); @@ -606,7 +595,7 @@ static void sink_input_attach_cb(pa_sink_input *i) { pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2); pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i)); - u->min_memblockq_length = (size_t) -1; + u->pop_called = false; } /* Called from output thread context */ @@ -621,6 +610,7 @@ static void sink_input_detach_cb(pa_sink_input *i) { pa_rtpoll_item_free(u->rtpoll_item_read); u->rtpoll_item_read = NULL; } + memblockq_adjust(u, 0); } /* Called from output thread context */ @@ -644,7 +634,6 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) { pa_memblockq_set_prebuf(u->memblockq, nbytes*2); pa_log_info("Max request changed"); - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED, NULL, 0, NULL, NULL); } /* Called from main thread */ @@ -746,6 +735,7 @@ int pa__init(pa_module *m) { uint32_t adjust_time_sec; const char *n; bool remix = true; + pa_usec_t counter; pa_assert(m); @@ -820,6 +810,7 @@ int pa__init(pa_module *m) { u->core = m->core; u->module = m; u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC; + u->pop_called = false; adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC; if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) { @@ -962,6 +953,13 @@ int pa__init(pa_module *m) { 0, /* minreq */ 0, /* maxrewind */ &silence); /* silence frame */ + silence.index = silence.length - pa_usec_to_bytes(PA_USEC_PER_MSEC * 100, &u->source_output->sample_spec); + silence.length = pa_usec_to_bytes(PA_USEC_PER_MSEC * 100, &u->source_output->sample_spec); + counter = 150 * PA_USEC_PER_MSEC; + while ( counter < u->latency) { + pa_memblockq_push_align(u->memblockq, &silence); + counter += 100 * PA_USEC_PER_MSEC; + } pa_memblock_unref(silence.memblock); u->asyncmsgq = pa_asyncmsgq_new(0); -- 2.1.3