On 26.04.2018 15:13, Tanu Kaskinen wrote: > On Thu, 2018-03-29 at 15:33 +0200, Georg Chini wrote: >> The rewrite of the thread function does not change functionality much, >> most of it is only cleanup, minor bug fixing and documentation work. >> >> This patch also changes the send buffer size for a2dp sink to avoid lags >> after temporary connection drops, following the proof-of-concept patch >> posted by Dmitry Kalyanov. >> >> Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746 >> --- >> Changes in v2: >> - fix issues pointed out by Tanu >> - set writable to false for HSP only if a block really needs to be written >> >> src/modules/bluetooth/module-bluez5-device.c | 289 ++++++++++++++++++--------- >> 1 file changed, 191 insertions(+), 98 deletions(-) >> >> diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c >> index c3acc1dc..dac1eb2a 100644 >> --- a/src/modules/bluetooth/module-bluez5-device.c >> +++ b/src/modules/bluetooth/module-bluez5-device.c >> @@ -56,9 +56,8 @@ PA_MODULE_LOAD_ONCE(false); >> PA_MODULE_USAGE("path=<device object path>" >> "autodetect_mtu=<boolean>"); >> >> -#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC) >> #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC) >> -#define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC) >> +#define FIXED_LATENCY_PLAYBACK_SCO (25 * PA_USEC_PER_MSEC) > Why is this changed? The commit message mentions nothing about this. Ups, sorry, I forgot to mention. I changed it because it delivers much better A/V sync for SCO. When testing the A2DP delay issue, I also tested HSP. In fact, as HSP is a synchronous connection, the delay for SCO playback/record should be smaller than for A2DP, but I decided to use the same value for all four constants to keep it simple. > >> #define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC) >> #define FIXED_LATENCY_RECORD_SCO (25 * PA_USEC_PER_MSEC) >> >> @@ -660,6 +659,38 @@ static int a2dp_process_push(struct userdata *u) { >> return ret; >> } >> >> +static void update_buffer_size(struct userdata *u) { >> + int old_bufsize; >> + socklen_t len = sizeof(int); >> + int ret; >> + >> + ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len); >> + if (ret == -1) { >> + pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno)); >> + } else { >> + int new_bufsize; >> + >> + /* Set send buffer size as small as possible. The minimum value is 1024 according to the >> + * socket man page. The data is written to the socket in chunks of write_block_size, so >> + * there should at least be room for two chunks in the buffer. Generally, write_block_size >> + * is larger than 512. If not, use the next multiple of write_block_size which is larger >> + * than 1024. */ >> + new_bufsize = 2 * u->write_block_size; >> + if (new_bufsize < 1024) >> + new_bufsize = (1024 / u->write_block_size + 1) * u->write_block_size; >> + >> + /* The kernel internally doubles the buffer size that was set by setsockopt and getsockopt >> + * returns the doubled value. */ >> + if (new_bufsize != old_bufsize / 2) { >> + ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len); >> + if (ret == -1) >> + pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize / 2, new_bufsize, pa_cstrerror(errno)); >> + else >> + pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize / 2, new_bufsize); >> + } >> + } >> +} >> + >> /* Run from I/O thread */ >> static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { >> struct sbc_info *sbc_info; >> @@ -694,6 +725,15 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { >> pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); >> pa_sink_set_fixed_latency_within_thread(u->sink, >> FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); >> + >> + /* If there is still data in the memchunk, we have to discard it >> + * because the write_block_size may have changed. */ >> + if (u->write_memchunk.memblock) { >> + pa_memblock_unref(u->write_memchunk.memblock); >> + pa_memchunk_reset(&u->write_memchunk); >> + } >> + >> + update_buffer_size(u); >> } >> >> /* Run from I/O thread */ >> @@ -852,8 +892,10 @@ static void setup_stream(struct userdata *u) { >> >> pa_log_debug("Stream properly set up, we're ready to roll!"); >> >> - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) >> + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { >> a2dp_set_bitpool(u, u->sbc_info.max_bitpool); >> + update_buffer_size(u); >> + } >> >> u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); >> pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> @@ -1068,12 +1110,12 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse >> switch (code) { >> >> case PA_SINK_MESSAGE_GET_LATENCY: { >> - int64_t wi, ri; >> + int64_t wi = 0, ri = 0; >> >> if (u->read_smoother) { >> ri = pa_smoother_get(u->read_smoother, pa_rtclock_now()); >> wi = pa_bytes_to_usec(u->write_index + u->write_block_size, &u->sample_spec); >> - } else { >> + } else if (u->started_at) { >> ri = pa_rtclock_now() - u->started_at; >> wi = pa_bytes_to_usec(u->write_index, &u->sample_spec); >> } >> @@ -1415,12 +1457,32 @@ static int init_profile(struct userdata *u) { >> return r; >> } >> >> +static int write_block(struct userdata *u) { >> + int n_written; >> + >> + if (u->write_index <= 0) >> + u->started_at = pa_rtclock_now(); >> + >> + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { >> + if ((n_written = a2dp_process_render(u)) < 0) >> + return -1; >> + } else { >> + if ((n_written = sco_process_render(u)) < 0) >> + return -1; >> + } >> + >> + if (n_written == 0) >> + pa_log_debug("Got EAGAIN on write() after POLLOUT, probably there is a temporary connection loss."); >> + >> + return n_written; >> +} >> + >> + >> /* I/O thread function */ >> static void thread_func(void *userdata) { >> struct userdata *u = userdata; >> - unsigned do_write = 0; >> - unsigned pending_read_bytes = 0; >> - bool writable = false; >> + unsigned blocks_to_write = 0; >> + unsigned bytes_to_write = 0; >> >> pa_assert(u); >> pa_assert(u->transport); >> @@ -1440,9 +1502,13 @@ static void thread_func(void *userdata) { >> struct pollfd *pollfd; >> int ret; >> bool disable_timer = true; >> + bool writable = false; >> + bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false; >> + bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false; >> >> pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; >> >> + /* Check for stream error or close */ >> if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) { >> pa_log_info("FD error: %s%s%s%s", >> pollfd->revents & POLLERR ? "POLLERR " :"", >> @@ -1453,147 +1519,174 @@ static void thread_func(void *userdata) { >> if (pollfd->revents & POLLHUP) { >> pollfd = NULL; >> teardown_stream(u); >> - do_write = 0; >> - pending_read_bytes = 0; >> - writable = false; >> + blocks_to_write = 0; >> + bytes_to_write = 0; >> pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL); >> } else >> goto fail; >> } >> >> - if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { >> + /* If there is a pollfd, the stream is set up and we need to do something */ >> + if (pollfd) { >> >> - /* We should send two blocks to the device before we expect >> - * a response. */ >> + /* Handle source if present */ >> + if (have_source) { >> >> - if (u->write_index == 0 && u->read_index <= 0) >> - do_write = 2; >> + /* We should send two blocks to the device before we expect a response. */ >> + if (u->write_index == 0 && u->read_index <= 0) >> + blocks_to_write = 2; >> >> - if (pollfd && (pollfd->revents & POLLIN)) { >> - int n_read; >> + /* If we got woken up by POLLIN let's do some reading */ >> + if (pollfd->revents & POLLIN) { >> + int n_read; >> >> - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) >> - n_read = a2dp_process_push(u); >> - else >> - n_read = sco_process_push(u); >> + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) >> + n_read = a2dp_process_push(u); >> + else >> + n_read = sco_process_push(u); >> >> - if (n_read < 0) >> - goto fail; >> + if (n_read < 0) >> + goto fail; >> >> - if (n_read > 0) { >> - /* We just read something, so we are supposed to write something, too */ >> - pending_read_bytes += n_read; >> - do_write += pending_read_bytes / u->write_block_size; >> - pending_read_bytes = pending_read_bytes % u->write_block_size; >> + if (n_read > 0) { >> + /* We just read something, so we are supposed to write something, too */ >> + bytes_to_write += n_read; >> + blocks_to_write += bytes_to_write / u->write_block_size; >> + bytes_to_write = bytes_to_write % u->write_block_size; >> + } >> } >> } >> - } >> >> - if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { >> + /* Handle sink if present */ >> + if (have_sink) { >> >> - if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) >> - pa_sink_process_rewind(u->sink, 0); >> + /* Process rewinds */ >> + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) >> + pa_sink_process_rewind(u->sink, 0); >> >> - if (pollfd) { >> + /* Test if the stream is writable */ >> if (pollfd->revents & POLLOUT) >> writable = true; >> >> - if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { >> - pa_usec_t time_passed; >> - pa_usec_t audio_sent; >> + /* If we have a source, we let the source determine the timing >> + * for the sink */ >> + if (have_source) { >> >> - /* Hmm, there is no input stream we could synchronize >> - * to. So let's do things by time */ >> + if (writable && blocks_to_write > 0) { >> + int result; >> >> - time_passed = pa_rtclock_now() - u->started_at; >> - audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); >> + if ((result = write_block(u)) < 0) >> + goto fail; >> >> + blocks_to_write -= result; >> + if (blocks_to_write > 0) >> + writable = false; > This "blocks_to_write > 0" check is new. In the previous version > writable was set to false unconditionally, and that's how I believe it > should be done. I guess this is an optimization: we won't write > anything before we get POLLIN, so to you it seemed unnecessary to wake > up on POLLOUT. Getting the POLLOUT notification is indeed unnecessary > in itself, but we still need to set POLLOUT when polling, because > otherwise when we wake up due to POLLIN, pollfd->revents will not have > POLLOUT set even if the socket is writable. As a result we won't write > when we're supposed to. > > Or actually... if we wake up on POLLIN, and POLLOUT isn't in revents > even if the socket is writable, we'll set POLLOUT on the subsequent > poll, and that will return immediately, so there's not much delay with > the write. So maybe the "blocks_to_write > 0" is fine after all. Some > kind of a comment would probably be good in any case. For example: > > "writable controls whether we set POLLOUT when polling - we set it to > false to enable POLLOUT. If there are more blocks to write, we want to > be woken up immediately when the socket becomes writable. If there > aren't currently any more blocks to write, then we'll have to wait > until we've received more data, so in that case we only want to set > POLLIN. Note that when we are woken up the next time, POLLOUT won't be > set in revents even if the socket has meanwhile become writable, which > may seem bad, but in that case we'll set POLLOUT in the subsequent > poll, and the poll will return immediately, so our writes won't be > delayed." > Yes it is an optimization and it works fine and significantly reduces CPU load on slow systems (which was the reason to implement it). I will add your comment to clarify.