Currently module-combine-sink uses only a rough estimate of the current slave sink latencies to calculate the rate for the various sink inputs. This leads to very inexact and unstable latency reports for the virtual sink. This patch fixes the issue by introducing latency snapshots like they are used in module-loopback. It also changes the definition of the target latency to ensure that there is always one sink which uses the base rate. --- src/modules/module-combine-sink.c | 176 +++++++++++++++++++++++++++++++------- 1 file changed, 145 insertions(+), 31 deletions(-) diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c index f7649a36..75677fb0 100644 --- a/src/modules/module-combine-sink.c +++ b/src/modules/module-combine-sink.c @@ -115,6 +115,14 @@ struct output { /* For communication of the stream latencies to the main thread */ pa_usec_t total_latency; + struct { + pa_usec_t timestamp; + pa_usec_t sink_latency; + size_t output_memblockq_size; + uint64_t receive_counter; + } latency_snapshot; + + uint64_t receive_counter; /* For communication of the stream parameters to the sink thread */ pa_atomic_t max_request; @@ -158,21 +166,33 @@ struct userdata { bool in_null_mode; pa_smoother *smoother; uint64_t counter; + + uint64_t snapshot_counter; + pa_usec_t snapshot_time; + + pa_usec_t render_timestamp; } thread_info; }; +struct sink_snapshot { + pa_usec_t timestamp; + uint64_t send_counter; +}; + enum { SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX, SINK_MESSAGE_REMOVE_OUTPUT, SINK_MESSAGE_NEED, SINK_MESSAGE_UPDATE_LATENCY, SINK_MESSAGE_UPDATE_MAX_REQUEST, - SINK_MESSAGE_UPDATE_LATENCY_RANGE + SINK_MESSAGE_UPDATE_LATENCY_RANGE, + SINK_MESSAGE_GET_SNAPSHOT }; enum { SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX, - SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY + SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, + SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT }; static void output_disable(struct output *o); @@ -182,10 +202,16 @@ static int output_create_sink_input(struct output *o); static void adjust_rates(struct userdata *u) { struct output *o; - pa_usec_t max_sink_latency = 0, min_total_latency = (pa_usec_t) -1, target_latency, avg_total_latency = 0; + struct sink_snapshot rdata; + pa_usec_t avg_total_latency = 0; + pa_usec_t target_latency = 0; + pa_usec_t max_sink_latency = 0; + pa_usec_t min_total_latency = (pa_usec_t)-1; uint32_t base_rate; uint32_t idx; unsigned n = 0; + pa_usec_t now; + struct output *o_max; pa_assert(u); pa_sink_assert_ref(u->sink); @@ -193,42 +219,82 @@ static void adjust_rates(struct userdata *u) { if (pa_idxset_size(u->outputs) <= 0) return; - if (!PA_SINK_IS_OPENED(pa_sink_get_state(u->sink))) + if (pa_sink_get_state(u->sink) != PA_SINK_RUNNING) + return; + + /* Get sink snapshot */ + pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_SNAPSHOT, &rdata, 0, NULL); + + /* The sink snapshot time is the time when the last data was rendered. + * Latency is calculated for that point in time. */ + now = rdata.timestamp; + + /* Sink snapshot is not yet valid. */ + if (!now) return; PA_IDXSET_FOREACH(o, u->outputs, idx) { - pa_usec_t sink_latency; + pa_usec_t snapshot_latency; + int64_t time_difference; if (!o->sink_input || !PA_SINK_IS_OPENED(pa_sink_get_state(o->sink))) continue; - o->total_latency = pa_sink_input_get_latency(o->sink_input, &sink_latency); - o->total_latency += sink_latency; - - if (sink_latency > max_sink_latency) - max_sink_latency = sink_latency; - - if (min_total_latency == (pa_usec_t) -1 || o->total_latency < min_total_latency) + /* The difference may become negative, because it is probable, that the last + * render time was before the sink input snapshot. In this case, the sink + * had some more latency at the render time, so subtracting the value still + * gives the right result. */ + time_difference = (int64_t)now - (int64_t)o->latency_snapshot.timestamp; + + /* Latency at sink snapshot time is sink input snapshot latency minus time + * passed between the two snapshots. */ + snapshot_latency = o->latency_snapshot.sink_latency + + pa_bytes_to_usec(o->latency_snapshot.output_memblockq_size, &o->sink_input->sample_spec) + - time_difference; + + /* Add the data that was sent between taking the sink input snapshot + * and the sink snapshot. */ + snapshot_latency += pa_bytes_to_usec(rdata.send_counter - o->latency_snapshot.receive_counter, &o->sink_input->sample_spec); + + /* This is the current combined latency of the slave sink and the related + * memblockq at the time of the sink snapshot. */ + o->total_latency = snapshot_latency; + avg_total_latency += snapshot_latency; + + /* Get max_sink_latency and min_total_latency for target selection. */ + if (min_total_latency == (pa_usec_t)-1 || o->total_latency < min_total_latency) min_total_latency = o->total_latency; - avg_total_latency += o->total_latency; - n++; + if (o->latency_snapshot.sink_latency > max_sink_latency) { + max_sink_latency = o->latency_snapshot.sink_latency; + o_max = o; + } - pa_log_debug("[%s] total=%0.2fms sink=%0.2fms ", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC, (double) sink_latency / PA_USEC_PER_MSEC); + /* Debug output */ + pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC); if (o->total_latency > 10*PA_USEC_PER_SEC) pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC); + + n++; } - if (min_total_latency == (pa_usec_t) -1) + /* If there is no valid output there is nothing to do. */ + if (min_total_latency == (pa_usec_t)-1) return; avg_total_latency /= n; - target_latency = PA_MAX(max_sink_latency, min_total_latency); + /* The target selection ensures, that at least one of the + * sinks will use the base rate and all other sinks are set + * relative to it. */ + if (max_sink_latency > min_total_latency) + target_latency = o_max->total_latency; + else + target_latency = min_total_latency; pa_log_info("[%s] avg total latency is %0.2f msec.", u->sink->name, (double) avg_total_latency / PA_USEC_PER_MSEC); - pa_log_info("[%s] target latency is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC); + pa_log_info("[%s] target latency for all slaves is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC); base_rate = u->sink->sample_spec.rate; @@ -248,14 +314,12 @@ static void adjust_rates(struct userdata *u) { pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", o->sink_input->sink->name, base_rate, new_rate); new_rate = base_rate; } else { - if (base_rate < new_rate + 20 && new_rate < base_rate + 20) - new_rate = base_rate; /* Do the adjustment in small steps; 2â?° can be considered inaudible */ if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) { pa_log_info("[%s] new rate of %u Hz not within 2â?° of %u Hz, forcing smaller adjustment", o->sink_input->sink->name, new_rate, current_rate); new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002)); } - pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.2f msec.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate, (double) o->total_latency / PA_USEC_PER_MSEC); + pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate); } pa_sink_input_set_rate(o->sink_input, new_rate); } @@ -270,13 +334,22 @@ static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct tim pa_assert(a); pa_assert(u->time_event == e); - adjust_rates(u); - if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED) { u->core->mainloop->time_free(e); u->time_event = NULL; - } else + } else { + struct output *o; + uint32_t idx; + pa_core_rttime_restart(u->core, e, pa_rtclock_now() + u->adjust_time); + + /* Get latency snapshots */ + PA_IDXSET_FOREACH(o, u->outputs, idx) { + pa_asyncmsgq_send(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL); + } + + } + adjust_rates(u); } static void process_render_null(struct userdata *u, pa_usec_t now) { @@ -386,7 +459,10 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) while (pa_asyncmsgq_process_one(o->audio_inq) > 0) ; - /* Ok, now let's prepare some data if we really have to */ + /* Ok, now let's prepare some data if we really have to. Save the + * the time for latency calculations. */ + u->thread_info.render_timestamp = pa_rtclock_now(); + while (!pa_memblockq_is_readable(o->memblockq)) { struct output *j; pa_memchunk chunk; @@ -395,6 +471,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) pa_sink_render(u->sink, length, &chunk); u->thread_info.counter += chunk.length; + o->receive_counter += chunk.length; /* OK, let's send this data to the other threads */ PA_LLIST_FOREACH(j, u->thread_info.active_outputs) { @@ -629,9 +706,10 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 case SINK_INPUT_MESSAGE_POST: - if (PA_SINK_IS_OPENED(o->sink_input->sink->thread_info.state)) + if (o->sink_input->sink->thread_info.state == PA_SINK_RUNNING) { pa_memblockq_push_align(o->memblockq, chunk); - else + o->receive_counter += chunk->length; + } else pa_memblockq_flush_write(o->memblockq, true); return 0; @@ -643,6 +721,24 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 return 0; } + + case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: { + size_t length; + + length = pa_memblockq_get_length(o->sink_input->thread_info.render_memblockq); + + o->latency_snapshot.output_memblockq_size = pa_memblockq_get_length(o->memblockq); + + /* Add content of memblockq's to sink latency */ + o->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(o->sink, true) + + pa_bytes_to_usec(length, &o->sink->sample_spec); + + o->latency_snapshot.timestamp = pa_rtclock_now(); + + o->latency_snapshot.receive_counter = o->receive_counter; + + return 0; + } } return pa_sink_input_process_msg(obj, code, data, offset, chunk); @@ -734,9 +830,10 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, running = new_state == PA_SINK_RUNNING; pa_atomic_store(&u->thread_info.running, running); - if (running) + if (running) { + u->thread_info.render_timestamp = 0; pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true); - else + } else pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now()); return 0; @@ -829,6 +926,7 @@ static void output_add_within_thread(struct output *o) { o->userdata->rtpoll, PA_RTPOLL_NORMAL, o->control_inq); + o->receive_counter = o->userdata->thread_info.counter; } /* Called from thread context of the io thread */ @@ -916,8 +1014,11 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse case SINK_MESSAGE_UPDATE_LATENCY: { pa_usec_t x, y, latency = (pa_usec_t) offset; - x = pa_rtclock_now(); - y = pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec); + /* It may be possible that thread_info.counter has been increased + * since we took the snapshot. Therefore we have to use the snapshot + * time and counter instead of the current values. */ + x = u->thread_info.snapshot_time; + y = pa_bytes_to_usec(u->thread_info.snapshot_counter, &u->sink->sample_spec); if (y > latency) y -= latency; @@ -928,6 +1029,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return 0; } + case SINK_MESSAGE_GET_SNAPSHOT: { + struct sink_snapshot *rdata = data; + + rdata->timestamp = u->thread_info.render_timestamp; + rdata->send_counter = u->thread_info.counter; + u->thread_info.snapshot_counter = u->thread_info.counter; + u->thread_info.snapshot_time = u->thread_info.render_timestamp; + + return 0; + } + case SINK_MESSAGE_UPDATE_MAX_REQUEST: update_max_request(u); break; @@ -1504,6 +1616,8 @@ int pa__init(pa_module*m) { u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_EARLY, (pa_hook_cb_t) sink_unlink_hook_cb, u); u->sink_state_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], PA_HOOK_NORMAL, (pa_hook_cb_t) sink_state_changed_hook_cb, u); + u->thread_info.render_timestamp = 0; + if (!(u->thread = pa_thread_new("combine", thread_func, u))) { pa_log("Failed to create thread."); goto fail; -- 2.14.1