On Thu, Jul 25, 2013 at 6:29 AM, Tanu Kaskinen <tanu.kaskinen at linux.intel.com> wrote: > On Fri, 2013-07-12 at 15:07 -0300, jprvita at gmail.com wrote: >> From: Jo?o Paulo Rechi Vita <jprvita at openbossa.org> >> >> Create the thread function, the render and push functions for A2DP, the >> process message function for communication between the I/O thread and >> the main thread, and other helper functions related to them. >> --- >> src/modules/bluetooth/module-bluez5-device.c | 637 +++++++++++++++++++++++++++ >> 1 file changed, 637 insertions(+) >> >> diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c >> index d0971d5..9a5534c 100644 >> --- a/src/modules/bluetooth/module-bluez5-device.c >> +++ b/src/modules/bluetooth/module-bluez5-device.c >> @@ -24,19 +24,29 @@ >> #include <config.h> >> #endif >> >> +#include <errno.h> >> + >> +#include <arpa/inet.h> >> #include <sbc/sbc.h> >> >> +#include <pulse/rtclock.h> >> +#include <pulse/timeval.h> >> + >> +#include <pulsecore/core-error.h> >> #include <pulsecore/core-util.h> >> #include <pulsecore/i18n.h> >> #include <pulsecore/module.h> >> #include <pulsecore/modargs.h> >> #include <pulsecore/poll.h> >> #include <pulsecore/rtpoll.h> >> +#include <pulsecore/socket-util.h> >> #include <pulsecore/thread.h> >> #include <pulsecore/thread-mq.h> >> +#include <pulsecore/time-smoother.h> >> >> #include "a2dp-codecs.h" >> #include "bluez5-util.h" >> +#include "rtp.h" >> >> #include "module-bluez5-device-symdef.h" >> >> @@ -46,11 +56,30 @@ PA_MODULE_VERSION(PACKAGE_VERSION); >> PA_MODULE_LOAD_ONCE(false); >> PA_MODULE_USAGE("path=<device object path>"); >> >> +#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC) >> +#define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC) >> +#define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC) >> + >> +#define BITPOOL_DEC_LIMIT 32 >> +#define BITPOOL_DEC_STEP 5 >> + >> static const char* const valid_modargs[] = { >> "path", >> NULL >> }; >> >> +enum { >> + BLUETOOTH_MESSAGE_IO_THREAD_FAILED, >> + BLUETOOTH_MESSAGE_MAX >> +}; >> + >> +typedef struct bluetooth_msg { >> + pa_msgobject parent; >> + pa_card *card; >> +} bluetooth_msg; >> +PA_DEFINE_PRIVATE_CLASS(bluetooth_msg, pa_msgobject); >> +#define BLUETOOTH_MSG(o) (bluetooth_msg_cast(o)) >> + >> typedef struct sbc_info { >> sbc_t sbc; /* Codec data */ >> bool sbc_initialized; /* Keep track if the encoder is initialized */ >> @@ -86,12 +115,19 @@ struct userdata { >> pa_thread_mq thread_mq; >> pa_rtpoll *rtpoll; >> pa_rtpoll_item *rtpoll_item; >> + bluetooth_msg *msg; >> >> int stream_fd; >> + int stream_write_type; >> size_t read_link_mtu; >> size_t write_link_mtu; >> size_t read_block_size; >> size_t write_block_size; >> + uint64_t read_index; >> + uint64_t write_index; >> + pa_usec_t started_at; >> + pa_smoother *read_smoother; >> + pa_memchunk write_memchunk; >> pa_sample_spec sample_spec; >> struct sbc_info sbc_info; >> }; >> @@ -210,6 +246,320 @@ static void connect_ports(struct userdata *u, void *new_data, pa_direction_t dir >> } >> } >> >> +/* Run from IO thread */ >> +static void a2dp_prepare_buffer(struct userdata *u) { >> + size_t min_buffer_size = PA_MAX(u->read_link_mtu, u->write_link_mtu); >> + >> + pa_assert(u); >> + >> + if (u->sbc_info.buffer_size >= min_buffer_size) >> + return; >> + >> + u->sbc_info.buffer_size = 2 * min_buffer_size; >> + pa_xfree(u->sbc_info.buffer); >> + u->sbc_info.buffer = pa_xmalloc(u->sbc_info.buffer_size); >> +} >> + >> +/* Run from IO thread */ >> +static int a2dp_process_render(struct userdata *u) { >> + struct sbc_info *sbc_info; >> + struct rtp_header *header; >> + struct rtp_payload *payload; >> + size_t nbytes; >> + void *d; >> + const void *p; >> + size_t to_write, to_encode; >> + unsigned frame_count; >> + int ret = 0; >> + >> + pa_assert(u); >> + pa_assert(u->profile == PROFILE_A2DP_SINK); >> + pa_assert(u->sink); >> + >> + /* First, render some data */ >> + if (!u->write_memchunk.memblock) >> + pa_sink_render_full(u->sink, u->write_block_size, &u->write_memchunk); >> + >> + pa_assert(u->write_memchunk.length == u->write_block_size); > > u->write_block_size may change due to bitpool reduction. If there was > audio left in write_memchunk due to an earlier EAGAIN error, this > assertion will crash. a2dp_process_render() needs to prepare for > write_block_size changing between > >> + >> + a2dp_prepare_buffer(u); >> + >> + sbc_info = &u->sbc_info; >> + header = sbc_info->buffer; >> + payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header)); >> + >> + frame_count = 0; >> + >> + /* Try to create a packet of the full MTU */ >> + >> + p = (const uint8_t *) pa_memblock_acquire_chunk(&u->write_memchunk); >> + to_encode = u->write_memchunk.length; >> + >> + d = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload); >> + to_write = sbc_info->buffer_size - sizeof(*header) - sizeof(*payload); > > This assumes that sbc_info->buffer_size is big enough to hold at least > rtp_header and rtp_payload structs. buffer_size is determined by the > MTU, and there seems to be no check that the MTU values are sensible. > >> + >> + while (PA_LIKELY(to_encode > 0 && to_write > 0)) { >> + ssize_t written; >> + ssize_t encoded; >> + >> + encoded = sbc_encode(&sbc_info->sbc, >> + p, to_encode, >> + d, to_write, >> + &written); >> + >> + if (PA_UNLIKELY(encoded <= 0)) { >> + pa_log_error("SBC encoding error (%li)", (long) encoded); >> + pa_memblock_release(u->write_memchunk.memblock); >> + return -1; >> + } >> + >> + pa_assert_fp((size_t) encoded <= to_encode); >> + pa_assert_fp((size_t) encoded == sbc_info->codesize); >> + >> + pa_assert_fp((size_t) written <= to_write); >> + pa_assert_fp((size_t) written == sbc_info->frame_length); > > These equality assertions make me nervous. I suggest adding a comment: > > /* These assertions are safe, because sbc_encode() encodes exactly one > * input block to exactly one output block (documented). If the buffer > * sizes are too small, sbc_encode() returns zero or an error (not > * documented), which we catch above. */ > pa_assert_fp((size_t) encoded <= to_encode); > pa_assert_fp((size_t) encoded == sbc_info->codesize); > > pa_assert_fp((size_t) written <= to_write); > pa_assert_fp((size_t) written == sbc_info->frame_length); > >> + >> + p = (const uint8_t*) p + encoded; >> + to_encode -= encoded; >> + >> + d = (uint8_t*) d + written; >> + to_write -= written; >> + >> + frame_count++; >> + } >> + >> + pa_memblock_release(u->write_memchunk.memblock); >> + >> + pa_assert(to_encode == 0); >> + >> + PA_ONCE_BEGIN { >> + pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&sbc_info->sbc))); >> + } PA_ONCE_END; >> + >> + /* write it to the fifo */ >> + memset(sbc_info->buffer, 0, sizeof(*header) + sizeof(*payload)); >> + header->v = 2; >> + header->pt = 1; >> + header->sequence_number = htons(sbc_info->seq_num++); >> + header->timestamp = htonl(u->write_index / pa_frame_size(&u->sample_spec)); >> + header->ssrc = htonl(1); >> + payload->frame_count = frame_count; >> + >> + nbytes = (uint8_t*) d - (uint8_t*) sbc_info->buffer; >> + >> + for (;;) { >> + ssize_t l; >> + >> + l = pa_write(u->stream_fd, sbc_info->buffer, nbytes, &u->stream_write_type); >> + >> + pa_assert(l != 0); >> + >> + if (l < 0) { >> + >> + if (errno == EINTR) >> + /* Retry right away if we got interrupted */ >> + continue; >> + >> + else if (errno == EAGAIN) >> + /* Hmm, apparently the socket was not writable, give up for now */ >> + break; >> + >> + pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno)); >> + ret = -1; >> + break; >> + } >> + >> + pa_assert((size_t) l <= nbytes); >> + >> + if ((size_t) l != nbytes) { >> + pa_log_warn("Wrote memory block to socket only partially! %llu written, wanted to write %llu.", >> + (unsigned long long) l, >> + (unsigned long long) nbytes); >> + ret = -1; >> + break; >> + } >> + >> + u->write_index += (uint64_t) u->write_memchunk.length; >> + pa_memblock_unref(u->write_memchunk.memblock); >> + pa_memchunk_reset(&u->write_memchunk); >> + >> + ret = 1; >> + >> + break; >> + } >> + >> + return ret; >> +} >> + >> +/* Run from IO thread */ >> +static int a2dp_process_push(struct userdata *u) { >> + int ret = 0; >> + pa_memchunk memchunk; >> + >> + pa_assert(u); >> + pa_assert(u->profile == PROFILE_A2DP_SOURCE); >> + pa_assert(u->source); >> + pa_assert(u->read_smoother); >> + >> + memchunk.memblock = pa_memblock_new(u->core->mempool, u->read_block_size); >> + memchunk.index = memchunk.length = 0; >> + >> + for (;;) { >> + bool found_tstamp = false; >> + pa_usec_t tstamp; >> + struct sbc_info *sbc_info; >> + struct rtp_header *header; >> + struct rtp_payload *payload; >> + const void *p; >> + void *d; >> + ssize_t l; >> + size_t to_write, to_decode; >> + >> + a2dp_prepare_buffer(u); >> + >> + sbc_info = &u->sbc_info; >> + header = sbc_info->buffer; >> + payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header)); >> + >> + l = pa_read(u->stream_fd, sbc_info->buffer, sbc_info->buffer_size, &u->stream_write_type); > > The code assumes later that what we read here is exactly one complete > rtp packet, nothing more, nothing less. I don't understand how such > assumption can be done. I think we should parse the rtp header to find > the packet boundary, and buffer any incomplete packets for later > reading. Or perhaps recvmsg() should be used (I guess that's needed also > for reading the timestamp, which is currently a TODO item). > >> + >> + if (l <= 0) { >> + >> + if (l < 0 && errno == EINTR) >> + /* Retry right away if we got interrupted */ >> + continue; >> + >> + else if (l < 0 && errno == EAGAIN) >> + /* Hmm, apparently the socket was not readable, give up for now. */ >> + break; >> + >> + pa_log_error("Failed to read data from socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF"); >> + ret = -1; >> + break; >> + } >> + >> + pa_assert((size_t) l <= sbc_info->buffer_size); >> + >> + u->read_index += (uint64_t) l; >> + >> + /* TODO: get timestamp from rtp */ >> + if (!found_tstamp) { >> + /* pa_log_warn("Couldn't find SO_TIMESTAMP data in auxiliary recvmsg() data!"); */ >> + tstamp = pa_rtclock_now(); >> + } > > There are two kind of timestamps being talked about here: RTP timestamps > and socket message timestamps. I don't know what we would use the socket > message timestamps for, but I think supporting RTP timestamps would be > very important, because it would allow us to detect dropped packets. We > should generate silence as a substitute for the lost packets. Not > detecting dropped packets means that our timing information gets messed > up, making it impossible for module-loopback to work properly. > > Are the RTP timestamp semantics defined well enough so that we could > know how much silence we need to generate? We seem to use the number of > written samples in our outgoing packets, is that what we can expect also > from incoming packets? > >> + >> + pa_smoother_put(u->read_smoother, tstamp, pa_bytes_to_usec(u->read_index, &u->sample_spec)); >> + pa_smoother_resume(u->read_smoother, tstamp, true); >> + >> + p = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload); >> + to_decode = l - sizeof(*header) - sizeof(*payload); > > to_decode is unsigned, so this assumes that we read at least > sizeof(*header) + sizeof(*payload) amount of data. That is not checked. > >> + >> + d = pa_memblock_acquire(memchunk.memblock); >> + to_write = memchunk.length = pa_memblock_get_length(memchunk.memblock); >> + >> + while (PA_LIKELY(to_decode > 0)) { >> + size_t written; >> + ssize_t decoded; >> + >> + decoded = sbc_decode(&sbc_info->sbc, >> + p, to_decode, >> + d, to_write, >> + &written); >> + >> + if (PA_UNLIKELY(decoded <= 0)) { >> + pa_log_error("SBC decoding error (%li)", (long) decoded); >> + pa_memblock_release(memchunk.memblock); >> + pa_memblock_unref(memchunk.memblock); >> + return -1; >> + } >> + >> + /* Reset frame length, it can be changed due to bitpool change */ >> + sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc); > > read_block_size is derived from the frame length, so that needs to be > updated too. And the source fixed latency depends on read_block_size. > And so does the buffer length that was passed to sbc_decode, so it may > happen that we passed too small buffer to sbc_decode(). > >> + >> + pa_assert_fp((size_t) decoded <= to_decode); >> + pa_assert_fp((size_t) decoded == sbc_info->frame_length); >> + >> + pa_assert_fp((size_t) written == sbc_info->codesize); > > This will crash if the output buffer was too small (an undocumented > feature of sbc_decode is that it truncates the output if the given > buffer is too small, instead of failing like sbc_encode() does). > Removing this assertion isn't the right fix, though, the right fix is to > ensure that we have big enough output buffer. > >> + >> + p = (const uint8_t*) p + decoded; >> + to_decode -= decoded; >> + >> + d = (uint8_t*) d + written; >> + to_write -= written; >> + } >> + >> + memchunk.length -= to_write; >> + >> + pa_memblock_release(memchunk.memblock); >> + >> + pa_source_post(u->source, &memchunk); >> + >> + ret = l; >> + break; >> + } >> + >> + pa_memblock_unref(memchunk.memblock); >> + >> + return ret; >> +} >> + >> +/* Run from I/O thread */ >> +static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { >> + struct sbc_info *sbc_info; >> + >> + pa_assert(u); >> + >> + sbc_info = &u->sbc_info; >> + >> + if (sbc_info->sbc.bitpool == bitpool) >> + return; >> + >> + if (bitpool > sbc_info->max_bitpool) >> + bitpool = sbc_info->max_bitpool; >> + else if (bitpool < sbc_info->min_bitpool) >> + bitpool = sbc_info->min_bitpool; >> + >> + sbc_info->sbc.bitpool = bitpool; >> + >> + sbc_info->codesize = sbc_get_codesize(&sbc_info->sbc); >> + sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc); >> + >> + pa_log_debug("Bitpool has changed to %u", sbc_info->sbc.bitpool); > > Could you calculate the resulting SBC bitrate and add it to the log > message? I think that would be nice to see in the log, because the > bitpool value doesn't really tell anything meaningful to the vast > majority of people (including me). If you do that, log the bitrate also > in transport_config() and when the bitpool changes in the a2dp_source > mode. > >> + >> + u->read_block_size = >> + (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) >> + / sbc_info->frame_length * sbc_info->codesize; >> + >> + u->write_block_size = >> + (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) >> + / sbc_info->frame_length * sbc_info->codesize; >> + >> + pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); >> + pa_sink_set_fixed_latency_within_thread(u->sink, >> + FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); >> +} >> + >> +/* Run from I/O thread */ >> +static void a2dp_reduce_bitpool(struct userdata *u) { >> + struct sbc_info *sbc_info; >> + uint8_t bitpool; >> + >> + pa_assert(u); >> + >> + sbc_info = &u->sbc_info; >> + >> + /* Check if bitpool is already at its limit */ >> + if (sbc_info->sbc.bitpool <= BITPOOL_DEC_LIMIT) >> + return; >> + >> + bitpool = sbc_info->sbc.bitpool - BITPOOL_DEC_STEP; >> + >> + if (bitpool < BITPOOL_DEC_LIMIT) >> + bitpool = BITPOOL_DEC_LIMIT; >> + >> + a2dp_set_bitpool(u, bitpool); >> +} >> + >> static void teardown_stream(struct userdata *u) { >> if (u->rtpoll_item) { >> pa_rtpoll_item_free(u->rtpoll_item); >> @@ -221,6 +571,16 @@ static void teardown_stream(struct userdata *u) { >> u->stream_fd = -1; >> } >> >> + if (u->read_smoother) { >> + pa_smoother_free(u->read_smoother); >> + u->read_smoother = NULL; >> + } >> + >> + if (u->write_memchunk.memblock) { >> + pa_memblock_unref(u->write_memchunk.memblock); >> + pa_memchunk_reset(&u->write_memchunk); >> + } >> + >> pa_log_debug("Audio stream torn down"); >> } >> >> @@ -258,6 +618,62 @@ static void transport_release(struct userdata *u) { >> teardown_stream(u); >> } >> >> +/* Run from I/O thread */ >> +static void transport_config_mtu(struct userdata *u) { >> + u->read_block_size = >> + (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) >> + / u->sbc_info.frame_length * u->sbc_info.codesize; >> + >> + u->write_block_size = >> + (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) >> + / u->sbc_info.frame_length * u->sbc_info.codesize; >> + >> + if (u->sink) { >> + pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); >> + pa_sink_set_fixed_latency_within_thread(u->sink, >> + FIXED_LATENCY_PLAYBACK_A2DP + >> + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); >> + } >> + >> + if (u->source) >> + pa_source_set_fixed_latency_within_thread(u->source, >> + FIXED_LATENCY_RECORD_A2DP + >> + pa_bytes_to_usec(u->read_block_size, &u->sample_spec)); >> +} >> + >> +/* Run from I/O thread */ >> +static void setup_stream(struct userdata *u) { >> + struct pollfd *pollfd; >> + int one; >> + >> + pa_log_info("Transport %s resuming", u->transport->path); >> + >> + transport_config_mtu(u); >> + >> + pa_make_fd_nonblock(u->stream_fd); >> + pa_make_socket_low_delay(u->stream_fd); >> + >> + one = 1; >> + if (setsockopt(u->stream_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) >> + pa_log_warn("Failed to enable SO_TIMESTAMP: %s", pa_cstrerror(errno)); >> + >> + pa_log_debug("Stream properly set up, we're ready to roll!"); >> + >> + if (u->profile == PROFILE_A2DP_SINK) >> + a2dp_set_bitpool(u, u->sbc_info.max_bitpool); >> + >> + u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); >> + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> + pollfd->fd = u->stream_fd; >> + pollfd->events = pollfd->revents = 0; >> + >> + u->read_index = u->write_index = 0; >> + u->started_at = 0; >> + >> + if (u->source) >> + u->read_smoother = pa_smoother_new(PA_USEC_PER_SEC, 2*PA_USEC_PER_SEC, true, true, 10, pa_rtclock_now(), true); >> +} >> + >> /* Run from main thread */ >> static int add_source(struct userdata *u) { >> pa_source_new_data data; >> @@ -496,6 +912,194 @@ static int init_profile(struct userdata *u) { >> >> /* I/O thread function */ >> static void thread_func(void *userdata) { >> + struct userdata *u = userdata; >> + unsigned do_write = 0; >> + unsigned pending_read_bytes = 0; >> + bool writable = false; >> + >> + pa_assert(u); >> + pa_assert(u->transport); >> + >> + pa_log_debug("IO Thread starting up"); >> + >> + if (u->core->realtime_scheduling) >> + pa_make_realtime(u->core->realtime_priority); >> + >> + pa_thread_mq_install(&u->thread_mq); >> + >> + /* Setup the stream only if the transport was already acquired */ >> + if (u->transport_acquired) >> + setup_stream(u); >> + >> + for (;;) { >> + struct pollfd *pollfd; >> + int ret; >> + bool disable_timer = true; >> + >> + pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; >> + >> + if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { >> + >> + /* We should send two blocks to the device before we expect >> + * a response. */ > > What response? I don't understand what this logic is trying to achieve. > >> + >> + if (u->write_index == 0 && u->read_index <= 0) > > Using == 0 with write_index and <= 0 with read_index looks odd. > read_index isn't any more suspectible to be negative than write_index > (both have unsigned type anyway). > >> + do_write = 2; >> + >> + if (pollfd && (pollfd->revents & POLLIN)) { >> + int n_read; >> + >> + n_read = a2dp_process_push(u); >> + >> + if (n_read < 0) >> + goto io_fail; >> + >> + /* We just read something, so we are supposed to write something, too */ >> + pending_read_bytes += n_read; >> + do_write += pending_read_bytes / u->write_block_size; >> + pending_read_bytes = pending_read_bytes % u->write_block_size; > > This is not relevant for A2DP. Although this doesn't actively do any > harm in the A2DP mode either, for readability I think it would be a good > idea to run this code only when it's actually useful. > >> + } >> + } >> + >> + if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { >> + >> + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) >> + pa_sink_process_rewind(u->sink, 0); >> + >> + if (pollfd) { >> + if (pollfd->revents & POLLOUT) >> + writable = true; >> + >> + if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { >> + pa_usec_t time_passed; >> + pa_usec_t audio_sent; >> + >> + /* Hmm, there is no input stream we could synchronize >> + * to. So let's do things by time */ >> + >> + time_passed = pa_rtclock_now() - u->started_at; > > u->started_at is 0 in the beginning, and initialized to a sensible value > only at the time of the first write attempt (BTW, I think it should be > initialized only if the write actually succeeds). So in the first round > time_passed will be very large, when it should be zero. > > This doesn't cause practical problems, because skipping is triggered > only if write_index is greater than zero, but I think the code would be > clearer if time_passed would be assigned zero here. > >> + audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); >> + >> + if (audio_sent <= time_passed) { >> + pa_usec_t audio_to_send = time_passed - audio_sent; >> + >> + /* Never try to catch up for more than 100ms */ >> + if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { > > Here the "write_index > 0" check wouldn't be needed if time_passed would > always have a sane value. > >> + pa_usec_t skip_usec; >> + uint64_t skip_bytes; >> + >> + skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; >> + skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec); >> + >> + if (skip_bytes > 0) { >> + pa_memchunk tmp; >> + >> + pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", >> + (unsigned long long) skip_usec, >> + (unsigned long long) skip_bytes); >> + >> + pa_sink_render_full(u->sink, skip_bytes, &tmp); > > write_memchunk may contain buffered audio, which should be consumed > first. I propose that the amount of audio to skip is saved in userdata, > and handled within a2dp_process_render(), because fiddling with > write_memchunk should ideally be done only in a2dp_process_render(). > >> + pa_memblock_unref(tmp.memblock); >> + u->write_index += skip_bytes; >> + >> + if (u->profile == PROFILE_A2DP_SINK) >> + a2dp_reduce_bitpool(u); >> + } >> + } >> + >> + do_write = 1; >> + pending_read_bytes = 0; >> + } >> + } >> + >> + if (writable && do_write > 0) { >> + int n_written; >> + >> + if (u->write_index <= 0) >> + u->started_at = pa_rtclock_now(); >> + >> + if ((n_written = a2dp_process_render(u)) < 0) >> + goto io_fail; >> + >> + if (n_written == 0) >> + pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); >> + >> + do_write -= n_written; >> + writable = false; >> + } >> + >> + if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) { > > We should enable the timer only if we have already successfully written > something, so this condition should be added: "&& u->write_index > 0". > >> + pa_usec_t sleep_for; >> + pa_usec_t time_passed, next_write_at; >> + >> + if (writable) { >> + /* Hmm, there is no input stream we could synchronize >> + * to. So let's estimate when we need to wake up the latest */ >> + time_passed = pa_rtclock_now() - u->started_at; >> + next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); >> + sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; >> + /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ >> + } else >> + /* drop stream every 500 ms */ >> + sleep_for = PA_USEC_PER_MSEC * 500; > > I don't think this works. I guess "drop stream" refers to the skipping > functionality, but skipping is only done if the socket is writable. If > the 500 ms timer fires, it means that the socket is not writable > (otherwise we would have woken up due to a POLLOUT event). If dropping > data every 500 ms is something that we want to do, the skipping code > needs to be modified so that it is run also if the socket is not > writable. > > Phew, this one took long. > Yes, and this code is very complex and sensible, thanks a lot for such a detailed review. But because of all the pending patches in this series and the other ~15 to come that implements HSP/HFP support, I suggest that we merge this piece of code how it is atm, since it has been tested for a long time (it's basically the same code from BlueZ 4 support), and take care of the comments on this patch and 53/56 after the release, what do you think? -- Jo?o Paulo Rechi Vita http://about.me/jprvita