Hi George, On Mon, Mar 5, 2018 at 9:49 AM, Georg Chini <georg at chini.tk> wrote: > The rewrite of the thread function does not change functionality much, > most of it is only cleanup, minor bug fixing and documentation work. > > This patch also changes the send buffer size for a2dp sink to avoid lags > after temporary connection drops, following the proof-of-concept patch > posted by Dmitry Kalyanov. > > Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746 > --- > src/modules/bluetooth/module-bluez5-device.c | 275 ++++++++++++++++++--------- > 1 file changed, 182 insertions(+), 93 deletions(-) > > diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c > index 7970dda7..149d1708 100644 > --- a/src/modules/bluetooth/module-bluez5-device.c > +++ b/src/modules/bluetooth/module-bluez5-device.c > @@ -56,7 +56,6 @@ PA_MODULE_LOAD_ONCE(false); > PA_MODULE_USAGE("path=<device object path>" > "autodetect_mtu=<boolean>"); > > -#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC) > #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC) > #define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC) > #define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC) > @@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) { > return ret; > } > > +static void update_buffer_size(struct userdata *u) { > + int old_bufsize; > + socklen_t len = sizeof(int); > + int ret; > + > + ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len); > + if (ret == -1) { > + pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno)); > + } else { > + int new_bufsize; > + > + /* Set send buffer size as small as possible. The minimum value is 1024 according to the > + * socket man page. The data is written to the socket in chunks of write_block_size, so > + * there should at least be room for two chunks in the buffer. Generally, write_block_size > + * is larger than 512. If not, use the next multiple of write_block_size which is larger > + * than 1024. */ Btw, the man page actually says it is 2048 bytes but perhaps you are considering the kernel doubles the valuse set for bookkeeping? Anyway getsockopt shall always return 2048 at least. > + new_bufsize = 2 * u->write_block_size; > + if (new_bufsize < 1024) > + new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size; > + > + /* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt > + * returns the doubled value. */ Btw, what happens if the buffer is reduced, for example when the bitpool is reduced, and there are packets still on the buffer? Are they discarded? > + if (new_bufsize != old_bufsize / 2) { > + ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len); > + if (ret == -1) > + pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno)); > + else > + pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize); > + } > + } > +} > + > /* Run from I/O thread */ > static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { > struct sbc_info *sbc_info; > @@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { > pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); > pa_sink_set_fixed_latency_within_thread(u->sink, > FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); > + > + /* If there is still data in the memchunk, we have to discard it > + * because the write_block_size may have changed. */ > + if (u->write_memchunk.memblock) { > + pa_memblock_unref(u->write_memchunk.memblock); > + pa_memchunk_reset(&u->write_memchunk); > + } > + > + update_buffer_size(u); > } > > /* Run from I/O thread */ > @@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) { > > pa_log_debug("Stream properly set up, we're ready to roll!"); > > - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) > + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { > a2dp_set_bitpool(u, u->sbc_info.max_bitpool); > + update_buffer_size(u); > + } > > u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); > pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); > @@ -861,7 +903,7 @@ static void setup_stream(struct userdata *u) { > pollfd->events = pollfd->revents = 0; > > u->read_index = u->write_index = 0; > - u->started_at = 0; > + u->started_at = pa_rtclock_now(); > u->stream_setup_done = true; > > if (u->source) > @@ -1407,12 +1449,32 @@ static int init_profile(struct userdata *u) { > return r; > } > > +static int write_block(struct userdata *u) { > + int n_written; > + > + if (u->write_index <= 0) > + u->started_at = pa_rtclock_now(); > + > + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { > + if ((n_written = a2dp_process_render(u)) < 0) > + return -1; > + } else { > + if ((n_written = sco_process_render(u)) < 0) > + return -1; > + } > + > + if (n_written == 0) > + pa_log("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss."); > + > + return n_written; > +} > + > + > /* I/O thread function */ > static void thread_func(void *userdata) { > struct userdata *u = userdata; > - unsigned do_write = 0; > - unsigned pending_read_bytes = 0; > - bool writable = false; > + unsigned blocks_to_write = 0; > + unsigned bytes_to_write = 0; > > pa_assert(u); > pa_assert(u->transport); > @@ -1432,9 +1494,13 @@ static void thread_func(void *userdata) { > struct pollfd *pollfd; > int ret; > bool disable_timer = true; > + bool writable = false; > + bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false; > + bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false; > > pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; > > + /* Check for stream error or close */ > if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) { > pa_log_info("FD error: %s%s%s%s", > pollfd->revents & POLLERR ? "POLLERR " :"", > @@ -1445,147 +1511,170 @@ static void thread_func(void *userdata) { > if (pollfd->revents & POLLHUP) { > pollfd = NULL; > teardown_stream(u); > - do_write = 0; > - pending_read_bytes = 0; > + blocks_to_write = 0; > + bytes_to_write = 0; > writable = false; > pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL); > } else > goto fail; > } > > - if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { > + /* If there is a pollfd, the stream is set up and we need to do something */ > + if (pollfd) { > > - /* We should send two blocks to the device before we expect > - * a response. */ > + /* Handle source if present */ > + if (have_source) { > > - if (u->write_index == 0 && u->read_index <= 0) > - do_write = 2; > + /* We should send two blocks to the device before we expect a response. */ > + if (u->write_index == 0 && u->read_index <= 0) > + blocks_to_write = 2; > > - if (pollfd && (pollfd->revents & POLLIN)) { > - int n_read; > + /* If we got woken up by POLLIN let's do some reading */ > + if (pollfd->revents & POLLIN) { > + int n_read; > > - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) > - n_read = a2dp_process_push(u); > - else > - n_read = sco_process_push(u); > + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) > + n_read = a2dp_process_push(u); > + else > + n_read = sco_process_push(u); > > - if (n_read < 0) > - goto fail; > + if (n_read < 0) > + goto fail; > > - if (n_read > 0) { > - /* We just read something, so we are supposed to write something, too */ > - pending_read_bytes += n_read; > - do_write += pending_read_bytes / u->write_block_size; > - pending_read_bytes = pending_read_bytes % u->write_block_size; > + if (n_read > 0) { > + /* We just read something, so we are supposed to write something, too */ > + bytes_to_write += n_read; > + blocks_to_write += bytes_to_write / u->write_block_size; > + bytes_to_write = bytes_to_write % u->write_block_size; > + } > } > } > - } > > - if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { > + /* Handle sink if present */ > + if (have_sink) { > > - if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) > - pa_sink_process_rewind(u->sink, 0); > + /* Process rewinds */ > + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) > + pa_sink_process_rewind(u->sink, 0); > > - if (pollfd) { > + /* Test if the stream is writable */ > if (pollfd->revents & POLLOUT) > writable = true; > > - if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { > + /* If we have a source, we let the source determine the timing > + * for the sink */ > + if (have_source) { > + > + if (writable && blocks_to_write > 0) { > + int result; > + > + if ((result = write_block(u)) < 0) > + goto fail; > + > + blocks_to_write -= result; > + writable = false; > + } > + > + /* There is no source, we have to use the system clock for timing */ > + } else { > + bool have_written = false; > pa_usec_t time_passed; > pa_usec_t audio_sent; > > - /* Hmm, there is no input stream we could synchronize > - * to. So let's do things by time */ > - > time_passed = pa_rtclock_now() - u->started_at; > audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); > > + /* A new block needs to be sent. */ > if (audio_sent <= time_passed) { > - pa_usec_t audio_to_send = time_passed - audio_sent; > + size_t bytes_to_send = pa_usec_to_bytes(time_passed - audio_sent, &u->sample_spec); > > - /* Never try to catch up for more than 100ms */ > - if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { > - pa_usec_t skip_usec; > + /* There are more than two blocks that need to be written. > + * We cannot catch up, therefore discard everything older > + * than two block sizes. */ > + if (bytes_to_send > 2 * u->write_block_size) { > uint64_t skip_bytes; > + pa_memchunk tmp; > + size_t mempool_max_block_size = pa_mempool_block_size_max(u->core->mempool); > + pa_usec_t skip_usec; > > - skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; > - skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec); > + skip_bytes = bytes_to_send - 2 * u->write_block_size; > + skip_usec = pa_bytes_to_usec(skip_bytes, &u->sample_spec); > > - if (skip_bytes > 0) { > - pa_memchunk tmp; > + pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", > + (unsigned long long) skip_usec, > + (unsigned long long) skip_bytes); > > - pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", > - (unsigned long long) skip_usec, > - (unsigned long long) skip_bytes); > + while (skip_bytes > 0) { > + size_t bytes_to_render; > > - pa_sink_render_full(u->sink, skip_bytes, &tmp); > - pa_memblock_unref(tmp.memblock); > - u->write_index += skip_bytes; > + if (skip_bytes > mempool_max_block_size) > + bytes_to_render = mempool_max_block_size; > + else > + bytes_to_render = skip_bytes; > > - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) > - a2dp_reduce_bitpool(u); > + pa_sink_render_full(u->sink, bytes_to_render, &tmp); > + pa_memblock_unref(tmp.memblock); > + u->write_index += bytes_to_render; > + skip_bytes -= bytes_to_render; > } > + > + if (u->write_index > 0 && u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) > + a2dp_reduce_bitpool(u); > } > > - do_write = 1; > - pending_read_bytes = 0; > + blocks_to_write = 1; > } > - } > - > - if (writable && do_write > 0) { > - int n_written; > > - if (u->write_index <= 0) > - u->started_at = pa_rtclock_now(); > + /* If the stream is writable, send some data if necessary */ > + if (writable && blocks_to_write > 0) { > + int result; > > - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { > - if ((n_written = a2dp_process_render(u)) < 0) > - goto fail; > - } else { > - if ((n_written = sco_process_render(u)) < 0) > + if ((result = write_block(u)) < 0) > goto fail; > - } > - > - if (n_written == 0) > - pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); > > - do_write -= n_written; > - writable = false; > - } > + blocks_to_write -= result; > + writable = false; > + have_written = true; > + } > > - if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) { > - pa_usec_t sleep_for; > - pa_usec_t time_passed, next_write_at; > - > - if (writable) { > - /* Hmm, there is no input stream we could synchronize > - * to. So let's estimate when we need to wake up the latest */ > - time_passed = pa_rtclock_now() - u->started_at; > - next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); > - sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; > - /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ > - } else > - /* drop stream every 500 ms */ > - sleep_for = PA_USEC_PER_MSEC * 500; > - > - pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); > - disable_timer = false; > + /* If nothing was written during this iteration, either the stream > + * is not writable or there was no write pending. Set up a timer that > + * will wake up the thread when the next data needs to be written. */ > + if (!have_written) { > + pa_usec_t sleep_for; > + pa_usec_t next_write_at; > + > + if (writable) { > + /* There was no write pending on this iteration of the loop. > + * Let's estimate when we need to wake up next */ > + next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); > + sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; > + /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ > + } else > + /* We could not write because the stream was not ready. Let's try > + * again in 500 ms and drop audio if we still can't write. The > + * thread will also be woken up when we can write again. */ > + sleep_for = PA_USEC_PER_MSEC * 500; > + > + pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); > + disable_timer = false; > + } > } > } > + > + /* Set events to wake up the thread */ > + pollfd->events = (short) (((have_sink && !writable) ? POLLOUT : 0) | (have_source ? POLLIN : 0)); > + > } > > if (disable_timer) > pa_rtpoll_set_timer_disabled(u->rtpoll); > > - /* Hmm, nothing to do. Let's sleep */ > - if (pollfd) > - pollfd->events = (short) (((u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state) && !writable) ? POLLOUT : 0) | > - (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state) ? POLLIN : 0)); > - > if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) { > pa_log_debug("pa_rtpoll_run failed with: %d", ret); > goto fail; > } > + > if (ret == 0) { > pa_log_debug("IO thread shutdown requested, stopping cleanly"); > transport_release(u); > -- > 2.14.1 > > _______________________________________________ > pulseaudio-discuss mailing list > pulseaudio-discuss at lists.freedesktop.org > https://lists.freedesktop.org/mailman/listinfo/pulseaudio-discuss -- Luiz Augusto von Dentz