From: Jo?o Paulo Rechi Vita <jprvita@xxxxxxxxxxxxx> Create the thread function, the render and push functions for A2DP, the process message function for communication between the I/O thread and the main thread, and other helper functions related to them. --- src/modules/bluetooth/module-bluez5-device.c | 638 +++++++++++++++++++++++++++ 1 file changed, 638 insertions(+) diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c index 7fbdc0e..010e3e2 100644 --- a/src/modules/bluetooth/module-bluez5-device.c +++ b/src/modules/bluetooth/module-bluez5-device.c @@ -24,19 +24,29 @@ #include <config.h> #endif +#include <errno.h> + +#include <arpa/inet.h> #include <sbc/sbc.h> +#include <pulse/rtclock.h> +#include <pulse/timeval.h> + +#include <pulsecore/core-error.h> #include <pulsecore/core-util.h> #include <pulsecore/i18n.h> #include <pulsecore/module.h> #include <pulsecore/modargs.h> #include <pulsecore/poll.h> #include <pulsecore/rtpoll.h> +#include <pulsecore/socket-util.h> #include <pulsecore/thread.h> #include <pulsecore/thread-mq.h> +#include <pulsecore/time-smoother.h> #include "a2dp-codecs.h" #include "bluez5-util.h" +#include "rtp.h" #include "module-bluez5-device-symdef.h" @@ -46,11 +56,30 @@ PA_MODULE_VERSION(PACKAGE_VERSION); PA_MODULE_LOAD_ONCE(false); PA_MODULE_USAGE("path=<device object path>"); +#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC) +#define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC) +#define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC) + +#define BITPOOL_DEC_LIMIT 32 +#define BITPOOL_DEC_STEP 5 + static const char* const valid_modargs[] = { "path", NULL }; +enum { + BLUETOOTH_MESSAGE_IO_THREAD_FAILED, + BLUETOOTH_MESSAGE_MAX +}; + +typedef struct bluetooth_msg { + pa_msgobject parent; + pa_card *card; +} bluetooth_msg; +PA_DEFINE_PRIVATE_CLASS(bluetooth_msg, pa_msgobject); +#define BLUETOOTH_MSG(o) (bluetooth_msg_cast(o)) + typedef struct sbc_info { sbc_t sbc; /* Codec data */ bool sbc_initialized; /* Keep track if the encoder is initialized */ @@ -86,12 +115,19 @@ struct userdata { pa_thread_mq thread_mq; pa_rtpoll *rtpoll; pa_rtpoll_item *rtpoll_item; + bluetooth_msg *msg; int stream_fd; + int stream_write_type; size_t read_link_mtu; size_t write_link_mtu; size_t read_block_size; size_t write_block_size; + uint64_t read_index; + uint64_t write_index; + pa_usec_t started_at; + pa_smoother *read_smoother; + pa_memchunk write_memchunk; pa_sample_spec sample_spec; struct sbc_info sbc_info; }; @@ -197,6 +233,320 @@ static void connect_ports(struct userdata *u, void *new_data, pa_direction_t dir } } +/* Run from IO thread */ +static void a2dp_prepare_buffer(struct userdata *u) { + size_t min_buffer_size = PA_MAX(u->read_link_mtu, u->write_link_mtu); + + pa_assert(u); + + if (u->sbc_info.buffer_size >= min_buffer_size) + return; + + u->sbc_info.buffer_size = 2 * min_buffer_size; + pa_xfree(u->sbc_info.buffer); + u->sbc_info.buffer = pa_xmalloc(u->sbc_info.buffer_size); +} + +/* Run from IO thread */ +static int a2dp_process_render(struct userdata *u) { + struct sbc_info *sbc_info; + struct rtp_header *header; + struct rtp_payload *payload; + size_t nbytes; + void *d; + const void *p; + size_t to_write, to_encode; + unsigned frame_count; + int ret = 0; + + pa_assert(u); + pa_assert(u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK); + pa_assert(u->sink); + + /* First, render some data */ + if (!u->write_memchunk.memblock) + pa_sink_render_full(u->sink, u->write_block_size, &u->write_memchunk); + + pa_assert(u->write_memchunk.length == u->write_block_size); + + a2dp_prepare_buffer(u); + + sbc_info = &u->sbc_info; + header = sbc_info->buffer; + payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header)); + + frame_count = 0; + + /* Try to create a packet of the full MTU */ + + p = (const uint8_t *) pa_memblock_acquire_chunk(&u->write_memchunk); + to_encode = u->write_memchunk.length; + + d = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload); + to_write = sbc_info->buffer_size - sizeof(*header) - sizeof(*payload); + + while (PA_LIKELY(to_encode > 0 && to_write > 0)) { + ssize_t written; + ssize_t encoded; + + encoded = sbc_encode(&sbc_info->sbc, + p, to_encode, + d, to_write, + &written); + + if (PA_UNLIKELY(encoded <= 0)) { + pa_log_error("SBC encoding error (%li)", (long) encoded); + pa_memblock_release(u->write_memchunk.memblock); + return -1; + } + + pa_assert_fp((size_t) encoded <= to_encode); + pa_assert_fp((size_t) encoded == sbc_info->codesize); + + pa_assert_fp((size_t) written <= to_write); + pa_assert_fp((size_t) written == sbc_info->frame_length); + + p = (const uint8_t*) p + encoded; + to_encode -= encoded; + + d = (uint8_t*) d + written; + to_write -= written; + + frame_count++; + } + + pa_memblock_release(u->write_memchunk.memblock); + + pa_assert(to_encode == 0); + + PA_ONCE_BEGIN { + pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&sbc_info->sbc))); + } PA_ONCE_END; + + /* write it to the fifo */ + memset(sbc_info->buffer, 0, sizeof(*header) + sizeof(*payload)); + header->v = 2; + header->pt = 1; + header->sequence_number = htons(sbc_info->seq_num++); + header->timestamp = htonl(u->write_index / pa_frame_size(&u->sample_spec)); + header->ssrc = htonl(1); + payload->frame_count = frame_count; + + nbytes = (uint8_t*) d - (uint8_t*) sbc_info->buffer; + + for (;;) { + ssize_t l; + + l = pa_write(u->stream_fd, sbc_info->buffer, nbytes, &u->stream_write_type); + + pa_assert(l != 0); + + if (l < 0) { + + if (errno == EINTR) + /* Retry right away if we got interrupted */ + continue; + + else if (errno == EAGAIN) + /* Hmm, apparently the socket was not writable, give up for now */ + break; + + pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno)); + ret = -1; + break; + } + + pa_assert((size_t) l <= nbytes); + + if ((size_t) l != nbytes) { + pa_log_warn("Wrote memory block to socket only partially! %llu written, wanted to write %llu.", + (unsigned long long) l, + (unsigned long long) nbytes); + ret = -1; + break; + } + + u->write_index += (uint64_t) u->write_memchunk.length; + pa_memblock_unref(u->write_memchunk.memblock); + pa_memchunk_reset(&u->write_memchunk); + + ret = 1; + + break; + } + + return ret; +} + +/* Run from IO thread */ +static int a2dp_process_push(struct userdata *u) { + int ret = 0; + pa_memchunk memchunk; + + pa_assert(u); + pa_assert(u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE); + pa_assert(u->source); + pa_assert(u->read_smoother); + + memchunk.memblock = pa_memblock_new(u->core->mempool, u->read_block_size); + memchunk.index = memchunk.length = 0; + + for (;;) { + bool found_tstamp = false; + pa_usec_t tstamp; + struct sbc_info *sbc_info; + struct rtp_header *header; + struct rtp_payload *payload; + const void *p; + void *d; + ssize_t l; + size_t to_write, to_decode; + + a2dp_prepare_buffer(u); + + sbc_info = &u->sbc_info; + header = sbc_info->buffer; + payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header)); + + l = pa_read(u->stream_fd, sbc_info->buffer, sbc_info->buffer_size, &u->stream_write_type); + + if (l <= 0) { + + if (l < 0 && errno == EINTR) + /* Retry right away if we got interrupted */ + continue; + + else if (l < 0 && errno == EAGAIN) + /* Hmm, apparently the socket was not readable, give up for now. */ + break; + + pa_log_error("Failed to read data from socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF"); + ret = -1; + break; + } + + pa_assert((size_t) l <= sbc_info->buffer_size); + + u->read_index += (uint64_t) l; + + /* TODO: get timestamp from rtp */ + if (!found_tstamp) { + /* pa_log_warn("Couldn't find SO_TIMESTAMP data in auxiliary recvmsg() data!"); */ + tstamp = pa_rtclock_now(); + } + + pa_smoother_put(u->read_smoother, tstamp, pa_bytes_to_usec(u->read_index, &u->sample_spec)); + pa_smoother_resume(u->read_smoother, tstamp, true); + + p = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload); + to_decode = l - sizeof(*header) - sizeof(*payload); + + d = pa_memblock_acquire(memchunk.memblock); + to_write = memchunk.length = pa_memblock_get_length(memchunk.memblock); + + while (PA_LIKELY(to_decode > 0)) { + size_t written; + ssize_t decoded; + + decoded = sbc_decode(&sbc_info->sbc, + p, to_decode, + d, to_write, + &written); + + if (PA_UNLIKELY(decoded <= 0)) { + pa_log_error("SBC decoding error (%li)", (long) decoded); + pa_memblock_release(memchunk.memblock); + pa_memblock_unref(memchunk.memblock); + return -1; + } + + /* Reset frame length, it can be changed due to bitpool change */ + sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc); + + pa_assert_fp((size_t) decoded <= to_decode); + pa_assert_fp((size_t) decoded == sbc_info->frame_length); + + pa_assert_fp((size_t) written == sbc_info->codesize); + + p = (const uint8_t*) p + decoded; + to_decode -= decoded; + + d = (uint8_t*) d + written; + to_write -= written; + } + + memchunk.length -= to_write; + + pa_memblock_release(memchunk.memblock); + + pa_source_post(u->source, &memchunk); + + ret = l; + break; + } + + pa_memblock_unref(memchunk.memblock); + + return ret; +} + +/* Run from I/O thread */ +static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { + struct sbc_info *sbc_info; + + pa_assert(u); + + sbc_info = &u->sbc_info; + + if (sbc_info->sbc.bitpool == bitpool) + return; + + if (bitpool > sbc_info->max_bitpool) + bitpool = sbc_info->max_bitpool; + else if (bitpool < sbc_info->min_bitpool) + bitpool = sbc_info->min_bitpool; + + sbc_info->sbc.bitpool = bitpool; + + sbc_info->codesize = sbc_get_codesize(&sbc_info->sbc); + sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc); + + pa_log_debug("Bitpool has changed to %u", sbc_info->sbc.bitpool); + + u->read_block_size = + (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) + / sbc_info->frame_length * sbc_info->codesize; + + u->write_block_size = + (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) + / sbc_info->frame_length * sbc_info->codesize; + + pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); + pa_sink_set_fixed_latency_within_thread(u->sink, + FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); +} + +/* Run from I/O thread */ +static void a2dp_reduce_bitpool(struct userdata *u) { + struct sbc_info *sbc_info; + uint8_t bitpool; + + pa_assert(u); + + sbc_info = &u->sbc_info; + + /* Check if bitpool is already at its limit */ + if (sbc_info->sbc.bitpool <= BITPOOL_DEC_LIMIT) + return; + + bitpool = sbc_info->sbc.bitpool - BITPOOL_DEC_STEP; + + if (bitpool < BITPOOL_DEC_LIMIT) + bitpool = BITPOOL_DEC_LIMIT; + + a2dp_set_bitpool(u, bitpool); +} + static void teardown_stream(struct userdata *u) { if (u->rtpoll_item) { pa_rtpoll_item_free(u->rtpoll_item); @@ -208,6 +558,16 @@ static void teardown_stream(struct userdata *u) { u->stream_fd = -1; } + if (u->read_smoother) { + pa_smoother_free(u->read_smoother); + u->read_smoother = NULL; + } + + if (u->write_memchunk.memblock) { + pa_memblock_unref(u->write_memchunk.memblock); + pa_memchunk_reset(&u->write_memchunk); + } + pa_log_debug("Audio stream torn down"); } @@ -245,6 +605,62 @@ static void transport_release(struct userdata *u) { teardown_stream(u); } +/* Run from I/O thread */ +static void transport_config_mtu(struct userdata *u) { + u->read_block_size = + (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) + / u->sbc_info.frame_length * u->sbc_info.codesize; + + u->write_block_size = + (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) + / u->sbc_info.frame_length * u->sbc_info.codesize; + + if (u->sink) { + pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); + pa_sink_set_fixed_latency_within_thread(u->sink, + FIXED_LATENCY_PLAYBACK_A2DP + + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); + } + + if (u->source) + pa_source_set_fixed_latency_within_thread(u->source, + FIXED_LATENCY_RECORD_A2DP + + pa_bytes_to_usec(u->read_block_size, &u->sample_spec)); +} + +/* Run from I/O thread */ +static void setup_stream(struct userdata *u) { + struct pollfd *pollfd; + int one; + + pa_log_info("Transport %s resuming", u->transport->path); + + transport_config_mtu(u); + + pa_make_fd_nonblock(u->stream_fd); + pa_make_socket_low_delay(u->stream_fd); + + one = 1; + if (setsockopt(u->stream_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) + pa_log_warn("Failed to enable SO_TIMESTAMP: %s", pa_cstrerror(errno)); + + pa_log_debug("Stream properly set up, we're ready to roll!"); + + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) + a2dp_set_bitpool(u, u->sbc_info.max_bitpool); + + u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + pollfd->fd = u->stream_fd; + pollfd->events = pollfd->revents = 0; + + u->read_index = u->write_index = 0; + u->started_at = 0; + + if (u->source) + u->read_smoother = pa_smoother_new(PA_USEC_PER_SEC, 2*PA_USEC_PER_SEC, true, true, 10, pa_rtclock_now(), true); +} + /* Run from main thread */ static int add_source(struct userdata *u) { pa_source_new_data data; @@ -484,6 +900,194 @@ static int init_profile(struct userdata *u) { /* I/O thread function */ static void thread_func(void *userdata) { + struct userdata *u = userdata; + unsigned do_write = 0; + unsigned pending_read_bytes = 0; + bool writable = false; + + pa_assert(u); + pa_assert(u->transport); + + pa_log_debug("IO Thread starting up"); + + if (u->core->realtime_scheduling) + pa_make_realtime(u->core->realtime_priority); + + pa_thread_mq_install(&u->thread_mq); + + /* Setup the stream only if the transport was already acquired */ + if (u->transport_acquired) + setup_stream(u); + + for (;;) { + struct pollfd *pollfd; + int ret; + bool disable_timer = true; + + pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; + + if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { + + /* We should send two blocks to the device before we expect + * a response. */ + + if (u->write_index == 0 && u->read_index <= 0) + do_write = 2; + + if (pollfd && (pollfd->revents & POLLIN)) { + int n_read; + + n_read = a2dp_process_push(u); + + if (n_read < 0) + goto io_fail; + + /* We just read something, so we are supposed to write something, too */ + pending_read_bytes += n_read; + do_write += pending_read_bytes / u->write_block_size; + pending_read_bytes = pending_read_bytes % u->write_block_size; + } + } + + if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { + + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) + pa_sink_process_rewind(u->sink, 0); + + if (pollfd) { + if (pollfd->revents & POLLOUT) + writable = true; + + if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { + pa_usec_t time_passed; + pa_usec_t audio_sent; + + /* Hmm, there is no input stream we could synchronize + * to. So let's do things by time */ + + time_passed = pa_rtclock_now() - u->started_at; + audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); + + if (audio_sent <= time_passed) { + pa_usec_t audio_to_send = time_passed - audio_sent; + + /* Never try to catch up for more than 100ms */ + if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { + pa_usec_t skip_usec; + uint64_t skip_bytes; + + skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; + skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec); + + if (skip_bytes > 0) { + pa_memchunk tmp; + + pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", + (unsigned long long) skip_usec, + (unsigned long long) skip_bytes); + + pa_sink_render_full(u->sink, skip_bytes, &tmp); + pa_memblock_unref(tmp.memblock); + u->write_index += skip_bytes; + + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) + a2dp_reduce_bitpool(u); + } + } + + do_write = 1; + pending_read_bytes = 0; + } + } + + if (writable && do_write > 0) { + int n_written; + + if (u->write_index <= 0) + u->started_at = pa_rtclock_now(); + + if ((n_written = a2dp_process_render(u)) < 0) + goto io_fail; + + if (n_written == 0) + pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); + + do_write -= n_written; + writable = false; + } + + if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) { + pa_usec_t sleep_for; + pa_usec_t time_passed, next_write_at; + + if (writable) { + /* Hmm, there is no input stream we could synchronize + * to. So let's estimate when we need to wake up the latest */ + time_passed = pa_rtclock_now() - u->started_at; + next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); + sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; + /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ + } else + /* drop stream every 500 ms */ + sleep_for = PA_USEC_PER_MSEC * 500; + + pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); + disable_timer = false; + } + } + } + + if (disable_timer) + pa_rtpoll_set_timer_disabled(u->rtpoll); + + /* Hmm, nothing to do. Let's sleep */ + if (pollfd) + pollfd->events = (short) (((u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state) && !writable) ? POLLOUT : 0) | + (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state) ? POLLIN : 0)); + + if ((ret = pa_rtpoll_run(u->rtpoll, true)) < 0) { + pa_log_debug("pa_rtpoll_run failed with: %d", ret); + goto fail; + } + if (ret == 0) { + pa_log_debug("IO thread shutdown requested, stopping cleanly"); + transport_release(u); + goto finish; + } + + pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; + + if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) { + pa_log_info("FD error: %s%s%s%s", + pollfd->revents & POLLERR ? "POLLERR " :"", + pollfd->revents & POLLHUP ? "POLLHUP " :"", + pollfd->revents & POLLPRI ? "POLLPRI " :"", + pollfd->revents & POLLNVAL ? "POLLNVAL " :""); + goto io_fail; + } + + continue; + +io_fail: + /* In case of HUP, just tear down the streams */ + if (!pollfd || (pollfd->revents & POLLHUP) == 0) + goto fail; + + do_write = 0; + pending_read_bytes = 0; + writable = false; + + teardown_stream(u); + } + +fail: + /* If this was no regular exit from the loop we have to continue processing messages until we receive PA_MESSAGE_SHUTDOWN */ + pa_log_debug("IO thread failed"); + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_IO_THREAD_FAILED, NULL, 0, NULL, NULL); + pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); + +finish: + pa_log_debug("IO thread shutting down"); } /* Run from main thread */ @@ -563,6 +1167,11 @@ static void stop_thread(struct userdata *u) { pa_source_unref(u->source); u->source = NULL; } + + if (u->read_smoother) { + pa_smoother_free(u->read_smoother); + u->read_smoother = NULL; + } } /* Run from main thread */ @@ -923,6 +1532,23 @@ static pa_hook_result_t device_connection_changed_cb(pa_bluetooth_discovery *y, return PA_HOOK_OK; } +/* Run from main thread context */ +static int device_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct bluetooth_msg *u = BLUETOOTH_MSG(obj); + + switch (code) { + case BLUETOOTH_MESSAGE_IO_THREAD_FAILED: + if (u->card->module->unload_requested) + break; + + pa_log_debug("Switching the profile to off due to IO thread failure."); + pa_assert_se(pa_card_set_profile(u->card, "off", false) >= 0); + break; + } + + return 0; +} + int pa__init(pa_module* m) { struct userdata *u; const char *path; @@ -958,6 +1584,12 @@ int pa__init(pa_module* m) { if (add_card(u) < 0) goto fail; + if (!(u->msg = pa_msgobject_new(bluetooth_msg))) + goto fail; + + u->msg->parent.process_msg = device_process_msg; + u->msg->card = u->card; + if (u->profile != PA_BLUETOOTH_PROFILE_OFF) if (init_profile(u) < 0) goto off; @@ -995,9 +1627,15 @@ void pa__done(pa_module *m) { if (u->device_connection_changed_slot) pa_hook_slot_free(u->device_connection_changed_slot); + if (u->sbc_info.buffer) + pa_xfree(u->sbc_info.buffer); + if (u->sbc_info.sbc_initialized) sbc_finish(&u->sbc_info.sbc); + if (u->msg) + pa_xfree(u->msg); + if (u->card) pa_card_free(u->card); -- 1.8.3.1