Ok, I went back to the calculations and found what was wrong; the few samples played back during the time of the actual fillup was not properly handled. The unix_write part is untested. Otherwise fixed according to Tanuk's suggestions (hopefully I didn't miss anything). So what do you think remains before this patch can go in? Anybody who wants to do some testing? Signed-off-by: David Henningsson <david.henningsson at canonical.com> --- src/modules/alsa/alsa-sink.c | 72 ++++++++++++++++++++++++++++++--------- src/pulsecore/protocol-native.c | 58 ++++++++++++++++++++++--------- src/pulsecore/sink-input.c | 28 +++++++++++++-- src/pulsecore/sink-input.h | 8 +++++ src/pulsecore/sink.c | 23 +++++++++++++ src/pulsecore/sink.h | 2 ++ 6 files changed, 157 insertions(+), 34 deletions(-) diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c index 69d006e..44ffdaa 100644 --- a/src/modules/alsa/alsa-sink.c +++ b/src/modules/alsa/alsa-sink.c @@ -146,6 +146,8 @@ struct userdata { uint64_t since_start; pa_usec_t smoother_interval; pa_usec_t last_smoother_update; + int64_t since_fill; + int64_t last_avail; pa_idxset *formats; @@ -508,7 +510,7 @@ static size_t check_left_to_play(struct userdata *u, size_t n_bytes, pa_bool_t o static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) { pa_bool_t work_done = FALSE; pa_usec_t max_sleep_usec = 0, process_usec = 0; - size_t left_to_play; + size_t left_to_play, input_underrun; unsigned j = 0; pa_assert(u); @@ -535,11 +537,14 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle } n_bytes = (size_t) n * u->frame_size; + u->since_fill += n_bytes - u->last_avail; #ifdef DEBUG_TIMING - pa_log_debug("avail: %lu", (unsigned long) n_bytes); + pa_log_debug("avail: %lu, since fill: %ld, advance: %ld", (unsigned long) n_bytes, + (long) u->since_fill, (long) (n_bytes - u->last_avail)); #endif + u->last_avail = n_bytes; left_to_play = check_left_to_play(u, n_bytes, on_timeout); on_timeout = FALSE; @@ -600,6 +605,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle const snd_pcm_channel_area_t *areas; snd_pcm_uframes_t offset, frames; snd_pcm_sframes_t sframes; + size_t written; frames = (snd_pcm_uframes_t) (n_bytes / u->frame_size); /* pa_log_debug("%lu frames to write", (unsigned long) frames); */ @@ -635,12 +641,14 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle p = (uint8_t*) areas[0].addr + (offset * u->frame_size); - chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE); + written = frames * u->frame_size; + chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, written, TRUE); chunk.length = pa_memblock_get_length(chunk.memblock); chunk.index = 0; pa_sink_render_into_full(u->sink, &chunk); pa_memblock_unref_fixed(chunk.memblock); + u->since_fill = 0; if (PA_UNLIKELY((sframes = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) { @@ -655,21 +663,26 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle work_done = TRUE; - u->write_count += frames * u->frame_size; - u->since_start += frames * u->frame_size; + u->write_count += written; + u->since_start += written; + u->last_avail -= written; #ifdef DEBUG_TIMING - pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) (frames * u->frame_size), (unsigned long) n_bytes); + pa_log_debug("Wrote %lu bytes (of possible %lu bytes)", (unsigned long) written, (unsigned long) n_bytes); #endif - if ((size_t) frames * u->frame_size >= n_bytes) + if (written >= n_bytes) break; - n_bytes -= (size_t) frames * u->frame_size; + n_bytes -= written; } } + input_underrun = pa_sink_process_input_underruns(u->sink, u->since_fill, left_to_play); + if (u->use_tsched) { + pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec); + *sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec); process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec); @@ -677,6 +690,8 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle *sleep_usec -= process_usec; else *sleep_usec = 0; + + *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep); } else *sleep_usec = 0; @@ -686,7 +701,7 @@ static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polled, pa_bool_t on_timeout) { pa_bool_t work_done = FALSE; pa_usec_t max_sleep_usec = 0, process_usec = 0; - size_t left_to_play; + size_t left_to_play, input_underrun; unsigned j = 0; pa_assert(u); @@ -710,6 +725,14 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle } n_bytes = (size_t) n * u->frame_size; + u->since_fill += n_bytes - u->last_avail; + +#ifdef DEBUG_TIMING + pa_log_debug("avail: %lu, since fill: %ld, advance: %ld", (unsigned long) n_bytes, + (long) u->since_fill, (long) (n_bytes - u->last_avail)); +#endif + + u->last_avail = n_bytes; left_to_play = check_left_to_play(u, n_bytes, on_timeout); on_timeout = FALSE; @@ -754,11 +777,14 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle for (;;) { snd_pcm_sframes_t frames; void *p; + size_t written; /* pa_log_debug("%lu frames to write", (unsigned long) frames); */ - if (u->memchunk.length <= 0) + if (u->memchunk.length <= 0) { pa_sink_render(u->sink, n_bytes, &u->memchunk); + u->since_fill = 0; + } pa_assert(u->memchunk.length > 0); @@ -788,8 +814,9 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle pa_assert(frames > 0); after_avail = FALSE; - u->memchunk.index += (size_t) frames * u->frame_size; - u->memchunk.length -= (size_t) frames * u->frame_size; + written = frames * u->frame_size; + u->memchunk.index += written; + u->memchunk.length -= written; if (u->memchunk.length <= 0) { pa_memblock_unref(u->memchunk.memblock); @@ -798,19 +825,24 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle work_done = TRUE; - u->write_count += frames * u->frame_size; - u->since_start += frames * u->frame_size; + u->write_count += written; + u->since_start += written; + u->last_avail -= written; /* pa_log_debug("wrote %lu frames", (unsigned long) frames); */ - if ((size_t) frames * u->frame_size >= n_bytes) + if (written >= n_bytes) break; - n_bytes -= (size_t) frames * u->frame_size; + n_bytes -= written; } } + input_underrun = pa_sink_process_input_underruns(u->sink, u->since_fill, left_to_play); + if (u->use_tsched) { + pa_usec_t underrun_sleep = pa_bytes_to_usec_round_up(input_underrun, &u->sink->sample_spec); + *sleep_usec = pa_bytes_to_usec(left_to_play, &u->sink->sample_spec); process_usec = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec); @@ -818,6 +850,8 @@ static int unix_write(struct userdata *u, pa_usec_t *sleep_usec, pa_bool_t polle *sleep_usec -= process_usec; else *sleep_usec = 0; + + *sleep_usec = PA_MIN(*sleep_usec, underrun_sleep); } else *sleep_usec = 0; @@ -1099,6 +1133,8 @@ static int unsuspend(struct userdata *u) { pa_smoother_reset(u->smoother, pa_rtclock_now(), TRUE); u->smoother_interval = SMOOTHER_MIN_INTERVAL; u->last_smoother_update = 0; + u->since_fill = 0; + u->last_avail = u->hwbuf_size; u->first = TRUE; u->since_start = 0; @@ -1663,6 +1699,8 @@ static int process_rewind(struct userdata *u) { pa_log_info("Tried rewind, but was apparently not possible."); else { u->write_count -= rewind_nbytes; + u->since_fill -= rewind_nbytes; + u->last_avail += rewind_nbytes; pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes); pa_sink_process_rewind(u->sink, rewind_nbytes); @@ -2311,6 +2349,8 @@ pa_sink *pa_alsa_sink_new(pa_module *m, pa_modargs *ma, const char*driver, pa_ca u->frame_size = frame_size; u->fragment_size = frag_size = (size_t) (period_frames * frame_size); u->hwbuf_size = buffer_size = (size_t) (buffer_frames * frame_size); + u->last_avail = u->hwbuf_size; + pa_cvolume_mute(&u->hardware_volume, u->sink->sample_spec.channels); pa_log_info("Using %0.1f fragments of size %lu bytes (%0.2fms), buffer size is %lu bytes (%0.2fms)", diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index a4ee920..ae467f4 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -231,6 +231,7 @@ enum { CONNECTION_MESSAGE_REVOKE }; +static bool sink_input_process_underrun_cb(pa_sink_input *i); static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk); static void sink_input_kill_cb(pa_sink_input *i); static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend); @@ -1171,6 +1172,7 @@ static playback_stream* playback_stream_new( s->sink_input->parent.process_msg = sink_input_process_msg; s->sink_input->pop = sink_input_pop_cb; + s->sink_input->process_underrun = sink_input_process_underrun_cb; s->sink_input->process_rewind = sink_input_process_rewind_cb; s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; s->sink_input->update_max_request = sink_input_update_max_request_cb; @@ -1573,6 +1575,44 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int return pa_sink_input_process_msg(o, code, userdata, offset, chunk); } + +static bool handle_input_underrun(playback_stream *s, bool force) +{ + bool send_drain; + + if (pa_memblockq_is_readable(s->memblockq)) + return false; + + if (!s->is_underrun) + pa_log_debug("%s %s of '%s'", force ? "Actual" : "Implicit", + s->drain_request ? "drain" : "underrun", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME))); + + send_drain = s->drain_request && (force || pa_sink_input_safe_to_remove(s->sink_input)); + + if (send_drain) { + s->drain_request = false; + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); + pa_log_debug("Drain acknowledged of '%s'", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME))); + } else if (!s->is_underrun) { + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL); + } + s->is_underrun = true; + playback_stream_request_bytes(s); + return true; +} + +/* Called from thread context */ +static bool sink_input_process_underrun_cb(pa_sink_input *i) { + playback_stream *s; + + pa_sink_input_assert_ref(i); + s = PLAYBACK_STREAM(i->userdata); + playback_stream_assert_ref(s); + + return handle_input_underrun(s, true); +} + + /* Called from thread context */ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) { playback_stream *s; @@ -1586,22 +1626,8 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); #endif - if (pa_memblockq_is_readable(s->memblockq)) - s->is_underrun = FALSE; - else { - if (!s->is_underrun) - pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq)); - - if (s->drain_request && pa_sink_input_safe_to_remove(i)) { - s->drain_request = FALSE; - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); - } else if (!s->is_underrun) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL); - - s->is_underrun = TRUE; - - playback_stream_request_bytes(s); - } + if (!handle_input_underrun(s, false)) + s->is_underrun = false; /* This call will not fail with prebuf=0, hence we check for underrun explicitly above */ diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index 519f47e..621e2a7 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -532,6 +532,7 @@ int pa_sink_input_new( i->thread_info.rewrite_flush = FALSE; i->thread_info.dont_rewind_render = FALSE; i->thread_info.underrun_for = (uint64_t) -1; + i->thread_info.underrun_for_sink = 0; i->thread_info.playing_for = 0; i->thread_info.direct_outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); @@ -827,7 +828,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i, pa_usec_t *sink_latency) { } /* Called from thread context */ -void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa_memchunk *chunk, pa_cvolume *volume) { +void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink bytes */, pa_memchunk *chunk, pa_cvolume *volume) { pa_bool_t do_volume_adj_here, need_volume_factor_sink; pa_bool_t volume_is_norm; size_t block_size_max_sink, block_size_max_sink_input; @@ -896,8 +897,10 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p pa_memblockq_seek(i->thread_info.render_memblockq, (int64_t) slength, PA_SEEK_RELATIVE, TRUE); i->thread_info.playing_for = 0; - if (i->thread_info.underrun_for != (uint64_t) -1) + if (i->thread_info.underrun_for != (uint64_t) -1) { i->thread_info.underrun_for += ilength_full; + i->thread_info.underrun_for_sink += slength; + } break; } @@ -907,6 +910,7 @@ void pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, p pa_assert(tchunk.memblock); i->thread_info.underrun_for = 0; + i->thread_info.underrun_for_sink = 0; i->thread_info.playing_for += tchunk.length; while (tchunk.length > 0) { @@ -1020,6 +1024,25 @@ void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec * } /* Called from thread context */ +bool pa_sink_input_process_underrun(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */) { + pa_sink_input_assert_ref(i); + pa_sink_input_assert_io_context(i); + + if (nbytes < pa_memblockq_get_maxrewind(i->thread_info.render_memblockq)) + return false; + if (pa_memblockq_is_readable(i->thread_info.render_memblockq)) + return false; + + if (i->process_underrun && i->process_underrun(i)) { + /* All valid data has been played back, so we can empty this queue. */ + pa_memblockq_silence(i->thread_info.render_memblockq); + return true; + } + return false; +} + + +/* Called from thread context */ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) { size_t lbq; pa_bool_t called = FALSE; @@ -1903,6 +1926,7 @@ void pa_sink_input_set_state_within_thread(pa_sink_input *i, pa_sink_input_state pa_log_debug("Requesting rewind due to uncorking"); i->thread_info.underrun_for = (uint64_t) -1; + i->thread_info.underrun_for_sink = 0; i->thread_info.playing_for = 0; /* Set the uncorked state *before* requesting rewind */ diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index 8bd5912..5790589 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -135,6 +135,11 @@ struct pa_sink_input { * the full block. */ int (*pop) (pa_sink_input *i, size_t request_nbytes, pa_memchunk *chunk); /* may NOT be NULL */ + /* This is called when the playback buffer has actually played back + all available data. Return true unless there is more data to play back. + Called from IO context. */ + bool (*process_underrun) (pa_sink_input *i); + /* Rewind the queue by the specified number of bytes. Called just * before peek() if it is called at all. Only called if the sink * input driver ever plans to call @@ -232,6 +237,7 @@ struct pa_sink_input { pa_bool_t rewrite_flush:1, dont_rewind_render:1; size_t rewrite_nbytes; uint64_t underrun_for, playing_for; + uint64_t underrun_for_sink; /* Like underrun_for, but in sink sample spec */ pa_sample_spec sample_spec; @@ -407,6 +413,8 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t pa_usec_t pa_sink_input_set_requested_latency_within_thread(pa_sink_input *i, pa_usec_t usec); pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i); +bool pa_sink_input_process_underrun(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */); + pa_memchunk* pa_sink_input_get_silence(pa_sink_input *i, pa_memchunk *ret); diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 6ebe956..1d9e296 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -915,6 +915,29 @@ void pa_sink_move_all_fail(pa_queue *q) { pa_queue_free(q, NULL); } + /* Called from IO thread context */ +size_t pa_sink_process_input_underruns(pa_sink *s, size_t since_fill, size_t left_to_play) { + pa_sink_input *i; + void *state = NULL; + size_t result = 0; + + pa_sink_assert_ref(s); + pa_sink_assert_io_context(s); + PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state) { + size_t uf = i->thread_info.underrun_for_sink; + if (uf == 0) + continue; + if (pa_sink_input_process_underrun(i, uf + since_fill)) + continue; + if (uf < left_to_play && uf > result) + result = uf; + } + + if (result > 0) + pa_log_debug("Found underrun %ld bytes ago (%ld bytes ahead in playback buffer)", (long) result, (long) left_to_play - result); + return left_to_play - result; +} + /* Called from IO thread context */ void pa_sink_process_rewind(pa_sink *s, size_t nbytes) { pa_sink_input *i; diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index fcda5ef..0803216 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -492,6 +492,8 @@ void pa_sink_update_volume_and_mute(pa_sink *s); pa_bool_t pa_sink_volume_change_apply(pa_sink *s, pa_usec_t *usec_to_next); +size_t pa_sink_process_input_underruns(pa_sink *s, size_t since_fill, size_t left_to_play); + /*** To be called exclusively by sink input drivers, from IO context */ void pa_sink_request_rewind(pa_sink*s, size_t nbytes); -- 1.7.9.5