On Fri, 2013-07-12 at 15:07 -0300, jprvita at gmail.com wrote: > From: Jo?o Paulo Rechi Vita <jprvita at openbossa.org> > > Create the thread function, the render and push functions for A2DP, the > process message function for communication between the I/O thread and > the main thread, and other helper functions related to them. > --- > src/modules/bluetooth/module-bluez5-device.c | 637 +++++++++++++++++++++++++++ > 1 file changed, 637 insertions(+) > > diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c > index d0971d5..9a5534c 100644 > --- a/src/modules/bluetooth/module-bluez5-device.c > +++ b/src/modules/bluetooth/module-bluez5-device.c > @@ -24,19 +24,29 @@ > #include <config.h> > #endif > > +#include <errno.h> > + > +#include <arpa/inet.h> > #include <sbc/sbc.h> > > +#include <pulse/rtclock.h> > +#include <pulse/timeval.h> > + > +#include <pulsecore/core-error.h> > #include <pulsecore/core-util.h> > #include <pulsecore/i18n.h> > #include <pulsecore/module.h> > #include <pulsecore/modargs.h> > #include <pulsecore/poll.h> > #include <pulsecore/rtpoll.h> > +#include <pulsecore/socket-util.h> > #include <pulsecore/thread.h> > #include <pulsecore/thread-mq.h> > +#include <pulsecore/time-smoother.h> > > #include "a2dp-codecs.h" > #include "bluez5-util.h" > +#include "rtp.h" > > #include "module-bluez5-device-symdef.h" > > @@ -46,11 +56,30 @@ PA_MODULE_VERSION(PACKAGE_VERSION); > PA_MODULE_LOAD_ONCE(false); > PA_MODULE_USAGE("path=<device object path>"); > > +#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC) > +#define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC) > +#define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC) > + > +#define BITPOOL_DEC_LIMIT 32 > +#define BITPOOL_DEC_STEP 5 > + > static const char* const valid_modargs[] = { > "path", > NULL > }; > > +enum { > + BLUETOOTH_MESSAGE_IO_THREAD_FAILED, > + BLUETOOTH_MESSAGE_MAX > +}; > + > +typedef struct bluetooth_msg { > + pa_msgobject parent; > + pa_card *card; > +} bluetooth_msg; > +PA_DEFINE_PRIVATE_CLASS(bluetooth_msg, pa_msgobject); > +#define BLUETOOTH_MSG(o) (bluetooth_msg_cast(o)) > + > typedef struct sbc_info { > sbc_t sbc; /* Codec data */ > bool sbc_initialized; /* Keep track if the encoder is initialized */ > @@ -86,12 +115,19 @@ struct userdata { > pa_thread_mq thread_mq; > pa_rtpoll *rtpoll; > pa_rtpoll_item *rtpoll_item; > + bluetooth_msg *msg; > > int stream_fd; > + int stream_write_type; > size_t read_link_mtu; > size_t write_link_mtu; > size_t read_block_size; > size_t write_block_size; > + uint64_t read_index; > + uint64_t write_index; > + pa_usec_t started_at; > + pa_smoother *read_smoother; > + pa_memchunk write_memchunk; > pa_sample_spec sample_spec; > struct sbc_info sbc_info; > }; > @@ -210,6 +246,320 @@ static void connect_ports(struct userdata *u, void *new_data, pa_direction_t dir > } > } > > +/* Run from IO thread */ > +static void a2dp_prepare_buffer(struct userdata *u) { > + size_t min_buffer_size = PA_MAX(u->read_link_mtu, u->write_link_mtu); > + > + pa_assert(u); > + > + if (u->sbc_info.buffer_size >= min_buffer_size) > + return; > + > + u->sbc_info.buffer_size = 2 * min_buffer_size; > + pa_xfree(u->sbc_info.buffer); > + u->sbc_info.buffer = pa_xmalloc(u->sbc_info.buffer_size); > +} > + > +/* Run from IO thread */ > +static int a2dp_process_render(struct userdata *u) { > + struct sbc_info *sbc_info; > + struct rtp_header *header; > + struct rtp_payload *payload; > + size_t nbytes; > + void *d; > + const void *p; > + size_t to_write, to_encode; > + unsigned frame_count; > + int ret = 0; > + > + pa_assert(u); > + pa_assert(u->profile == PROFILE_A2DP_SINK); > + pa_assert(u->sink); > + > + /* First, render some data */ > + if (!u->write_memchunk.memblock) > + pa_sink_render_full(u->sink, u->write_block_size, &u->write_memchunk); > + > + pa_assert(u->write_memchunk.length == u->write_block_size); u->write_block_size may change due to bitpool reduction. If there was audio left in write_memchunk due to an earlier EAGAIN error, this assertion will crash. a2dp_process_render() needs to prepare for write_block_size changing between > + > + a2dp_prepare_buffer(u); > + > + sbc_info = &u->sbc_info; > + header = sbc_info->buffer; > + payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header)); > + > + frame_count = 0; > + > + /* Try to create a packet of the full MTU */ > + > + p = (const uint8_t *) pa_memblock_acquire_chunk(&u->write_memchunk); > + to_encode = u->write_memchunk.length; > + > + d = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload); > + to_write = sbc_info->buffer_size - sizeof(*header) - sizeof(*payload); This assumes that sbc_info->buffer_size is big enough to hold at least rtp_header and rtp_payload structs. buffer_size is determined by the MTU, and there seems to be no check that the MTU values are sensible. > + > + while (PA_LIKELY(to_encode > 0 && to_write > 0)) { > + ssize_t written; > + ssize_t encoded; > + > + encoded = sbc_encode(&sbc_info->sbc, > + p, to_encode, > + d, to_write, > + &written); > + > + if (PA_UNLIKELY(encoded <= 0)) { > + pa_log_error("SBC encoding error (%li)", (long) encoded); > + pa_memblock_release(u->write_memchunk.memblock); > + return -1; > + } > + > + pa_assert_fp((size_t) encoded <= to_encode); > + pa_assert_fp((size_t) encoded == sbc_info->codesize); > + > + pa_assert_fp((size_t) written <= to_write); > + pa_assert_fp((size_t) written == sbc_info->frame_length); These equality assertions make me nervous. I suggest adding a comment: /* These assertions are safe, because sbc_encode() encodes exactly one * input block to exactly one output block (documented). If the buffer * sizes are too small, sbc_encode() returns zero or an error (not * documented), which we catch above. */ pa_assert_fp((size_t) encoded <= to_encode); pa_assert_fp((size_t) encoded == sbc_info->codesize); pa_assert_fp((size_t) written <= to_write); pa_assert_fp((size_t) written == sbc_info->frame_length); > + > + p = (const uint8_t*) p + encoded; > + to_encode -= encoded; > + > + d = (uint8_t*) d + written; > + to_write -= written; > + > + frame_count++; > + } > + > + pa_memblock_release(u->write_memchunk.memblock); > + > + pa_assert(to_encode == 0); > + > + PA_ONCE_BEGIN { > + pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&sbc_info->sbc))); > + } PA_ONCE_END; > + > + /* write it to the fifo */ > + memset(sbc_info->buffer, 0, sizeof(*header) + sizeof(*payload)); > + header->v = 2; > + header->pt = 1; > + header->sequence_number = htons(sbc_info->seq_num++); > + header->timestamp = htonl(u->write_index / pa_frame_size(&u->sample_spec)); > + header->ssrc = htonl(1); > + payload->frame_count = frame_count; > + > + nbytes = (uint8_t*) d - (uint8_t*) sbc_info->buffer; > + > + for (;;) { > + ssize_t l; > + > + l = pa_write(u->stream_fd, sbc_info->buffer, nbytes, &u->stream_write_type); > + > + pa_assert(l != 0); > + > + if (l < 0) { > + > + if (errno == EINTR) > + /* Retry right away if we got interrupted */ > + continue; > + > + else if (errno == EAGAIN) > + /* Hmm, apparently the socket was not writable, give up for now */ > + break; > + > + pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno)); > + ret = -1; > + break; > + } > + > + pa_assert((size_t) l <= nbytes); > + > + if ((size_t) l != nbytes) { > + pa_log_warn("Wrote memory block to socket only partially! %llu written, wanted to write %llu.", > + (unsigned long long) l, > + (unsigned long long) nbytes); > + ret = -1; > + break; > + } > + > + u->write_index += (uint64_t) u->write_memchunk.length; > + pa_memblock_unref(u->write_memchunk.memblock); > + pa_memchunk_reset(&u->write_memchunk); > + > + ret = 1; > + > + break; > + } > + > + return ret; > +} > + > +/* Run from IO thread */ > +static int a2dp_process_push(struct userdata *u) { > + int ret = 0; > + pa_memchunk memchunk; > + > + pa_assert(u); > + pa_assert(u->profile == PROFILE_A2DP_SOURCE); > + pa_assert(u->source); > + pa_assert(u->read_smoother); > + > + memchunk.memblock = pa_memblock_new(u->core->mempool, u->read_block_size); > + memchunk.index = memchunk.length = 0; > + > + for (;;) { > + bool found_tstamp = false; > + pa_usec_t tstamp; > + struct sbc_info *sbc_info; > + struct rtp_header *header; > + struct rtp_payload *payload; > + const void *p; > + void *d; > + ssize_t l; > + size_t to_write, to_decode; > + > + a2dp_prepare_buffer(u); > + > + sbc_info = &u->sbc_info; > + header = sbc_info->buffer; > + payload = (struct rtp_payload*) ((uint8_t*) sbc_info->buffer + sizeof(*header)); > + > + l = pa_read(u->stream_fd, sbc_info->buffer, sbc_info->buffer_size, &u->stream_write_type); The code assumes later that what we read here is exactly one complete rtp packet, nothing more, nothing less. I don't understand how such assumption can be done. I think we should parse the rtp header to find the packet boundary, and buffer any incomplete packets for later reading. Or perhaps recvmsg() should be used (I guess that's needed also for reading the timestamp, which is currently a TODO item). > + > + if (l <= 0) { > + > + if (l < 0 && errno == EINTR) > + /* Retry right away if we got interrupted */ > + continue; > + > + else if (l < 0 && errno == EAGAIN) > + /* Hmm, apparently the socket was not readable, give up for now. */ > + break; > + > + pa_log_error("Failed to read data from socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF"); > + ret = -1; > + break; > + } > + > + pa_assert((size_t) l <= sbc_info->buffer_size); > + > + u->read_index += (uint64_t) l; > + > + /* TODO: get timestamp from rtp */ > + if (!found_tstamp) { > + /* pa_log_warn("Couldn't find SO_TIMESTAMP data in auxiliary recvmsg() data!"); */ > + tstamp = pa_rtclock_now(); > + } There are two kind of timestamps being talked about here: RTP timestamps and socket message timestamps. I don't know what we would use the socket message timestamps for, but I think supporting RTP timestamps would be very important, because it would allow us to detect dropped packets. We should generate silence as a substitute for the lost packets. Not detecting dropped packets means that our timing information gets messed up, making it impossible for module-loopback to work properly. Are the RTP timestamp semantics defined well enough so that we could know how much silence we need to generate? We seem to use the number of written samples in our outgoing packets, is that what we can expect also from incoming packets? > + > + pa_smoother_put(u->read_smoother, tstamp, pa_bytes_to_usec(u->read_index, &u->sample_spec)); > + pa_smoother_resume(u->read_smoother, tstamp, true); > + > + p = (uint8_t*) sbc_info->buffer + sizeof(*header) + sizeof(*payload); > + to_decode = l - sizeof(*header) - sizeof(*payload); to_decode is unsigned, so this assumes that we read at least sizeof(*header) + sizeof(*payload) amount of data. That is not checked. > + > + d = pa_memblock_acquire(memchunk.memblock); > + to_write = memchunk.length = pa_memblock_get_length(memchunk.memblock); > + > + while (PA_LIKELY(to_decode > 0)) { > + size_t written; > + ssize_t decoded; > + > + decoded = sbc_decode(&sbc_info->sbc, > + p, to_decode, > + d, to_write, > + &written); > + > + if (PA_UNLIKELY(decoded <= 0)) { > + pa_log_error("SBC decoding error (%li)", (long) decoded); > + pa_memblock_release(memchunk.memblock); > + pa_memblock_unref(memchunk.memblock); > + return -1; > + } > + > + /* Reset frame length, it can be changed due to bitpool change */ > + sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc); read_block_size is derived from the frame length, so that needs to be updated too. And the source fixed latency depends on read_block_size. And so does the buffer length that was passed to sbc_decode, so it may happen that we passed too small buffer to sbc_decode(). > + > + pa_assert_fp((size_t) decoded <= to_decode); > + pa_assert_fp((size_t) decoded == sbc_info->frame_length); > + > + pa_assert_fp((size_t) written == sbc_info->codesize); This will crash if the output buffer was too small (an undocumented feature of sbc_decode is that it truncates the output if the given buffer is too small, instead of failing like sbc_encode() does). Removing this assertion isn't the right fix, though, the right fix is to ensure that we have big enough output buffer. > + > + p = (const uint8_t*) p + decoded; > + to_decode -= decoded; > + > + d = (uint8_t*) d + written; > + to_write -= written; > + } > + > + memchunk.length -= to_write; > + > + pa_memblock_release(memchunk.memblock); > + > + pa_source_post(u->source, &memchunk); > + > + ret = l; > + break; > + } > + > + pa_memblock_unref(memchunk.memblock); > + > + return ret; > +} > + > +/* Run from I/O thread */ > +static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { > + struct sbc_info *sbc_info; > + > + pa_assert(u); > + > + sbc_info = &u->sbc_info; > + > + if (sbc_info->sbc.bitpool == bitpool) > + return; > + > + if (bitpool > sbc_info->max_bitpool) > + bitpool = sbc_info->max_bitpool; > + else if (bitpool < sbc_info->min_bitpool) > + bitpool = sbc_info->min_bitpool; > + > + sbc_info->sbc.bitpool = bitpool; > + > + sbc_info->codesize = sbc_get_codesize(&sbc_info->sbc); > + sbc_info->frame_length = sbc_get_frame_length(&sbc_info->sbc); > + > + pa_log_debug("Bitpool has changed to %u", sbc_info->sbc.bitpool); Could you calculate the resulting SBC bitrate and add it to the log message? I think that would be nice to see in the log, because the bitpool value doesn't really tell anything meaningful to the vast majority of people (including me). If you do that, log the bitrate also in transport_config() and when the bitpool changes in the a2dp_source mode. > + > + u->read_block_size = > + (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) > + / sbc_info->frame_length * sbc_info->codesize; > + > + u->write_block_size = > + (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) > + / sbc_info->frame_length * sbc_info->codesize; > + > + pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); > + pa_sink_set_fixed_latency_within_thread(u->sink, > + FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); > +} > + > +/* Run from I/O thread */ > +static void a2dp_reduce_bitpool(struct userdata *u) { > + struct sbc_info *sbc_info; > + uint8_t bitpool; > + > + pa_assert(u); > + > + sbc_info = &u->sbc_info; > + > + /* Check if bitpool is already at its limit */ > + if (sbc_info->sbc.bitpool <= BITPOOL_DEC_LIMIT) > + return; > + > + bitpool = sbc_info->sbc.bitpool - BITPOOL_DEC_STEP; > + > + if (bitpool < BITPOOL_DEC_LIMIT) > + bitpool = BITPOOL_DEC_LIMIT; > + > + a2dp_set_bitpool(u, bitpool); > +} > + > static void teardown_stream(struct userdata *u) { > if (u->rtpoll_item) { > pa_rtpoll_item_free(u->rtpoll_item); > @@ -221,6 +571,16 @@ static void teardown_stream(struct userdata *u) { > u->stream_fd = -1; > } > > + if (u->read_smoother) { > + pa_smoother_free(u->read_smoother); > + u->read_smoother = NULL; > + } > + > + if (u->write_memchunk.memblock) { > + pa_memblock_unref(u->write_memchunk.memblock); > + pa_memchunk_reset(&u->write_memchunk); > + } > + > pa_log_debug("Audio stream torn down"); > } > > @@ -258,6 +618,62 @@ static void transport_release(struct userdata *u) { > teardown_stream(u); > } > > +/* Run from I/O thread */ > +static void transport_config_mtu(struct userdata *u) { > + u->read_block_size = > + (u->read_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) > + / u->sbc_info.frame_length * u->sbc_info.codesize; > + > + u->write_block_size = > + (u->write_link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload)) > + / u->sbc_info.frame_length * u->sbc_info.codesize; > + > + if (u->sink) { > + pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); > + pa_sink_set_fixed_latency_within_thread(u->sink, > + FIXED_LATENCY_PLAYBACK_A2DP + > + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); > + } > + > + if (u->source) > + pa_source_set_fixed_latency_within_thread(u->source, > + FIXED_LATENCY_RECORD_A2DP + > + pa_bytes_to_usec(u->read_block_size, &u->sample_spec)); > +} > + > +/* Run from I/O thread */ > +static void setup_stream(struct userdata *u) { > + struct pollfd *pollfd; > + int one; > + > + pa_log_info("Transport %s resuming", u->transport->path); > + > + transport_config_mtu(u); > + > + pa_make_fd_nonblock(u->stream_fd); > + pa_make_socket_low_delay(u->stream_fd); > + > + one = 1; > + if (setsockopt(u->stream_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) > + pa_log_warn("Failed to enable SO_TIMESTAMP: %s", pa_cstrerror(errno)); > + > + pa_log_debug("Stream properly set up, we're ready to roll!"); > + > + if (u->profile == PROFILE_A2DP_SINK) > + a2dp_set_bitpool(u, u->sbc_info.max_bitpool); > + > + u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); > + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); > + pollfd->fd = u->stream_fd; > + pollfd->events = pollfd->revents = 0; > + > + u->read_index = u->write_index = 0; > + u->started_at = 0; > + > + if (u->source) > + u->read_smoother = pa_smoother_new(PA_USEC_PER_SEC, 2*PA_USEC_PER_SEC, true, true, 10, pa_rtclock_now(), true); > +} > + > /* Run from main thread */ > static int add_source(struct userdata *u) { > pa_source_new_data data; > @@ -496,6 +912,194 @@ static int init_profile(struct userdata *u) { > > /* I/O thread function */ > static void thread_func(void *userdata) { > + struct userdata *u = userdata; > + unsigned do_write = 0; > + unsigned pending_read_bytes = 0; > + bool writable = false; > + > + pa_assert(u); > + pa_assert(u->transport); > + > + pa_log_debug("IO Thread starting up"); > + > + if (u->core->realtime_scheduling) > + pa_make_realtime(u->core->realtime_priority); > + > + pa_thread_mq_install(&u->thread_mq); > + > + /* Setup the stream only if the transport was already acquired */ > + if (u->transport_acquired) > + setup_stream(u); > + > + for (;;) { > + struct pollfd *pollfd; > + int ret; > + bool disable_timer = true; > + > + pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; > + > + if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { > + > + /* We should send two blocks to the device before we expect > + * a response. */ What response? I don't understand what this logic is trying to achieve. > + > + if (u->write_index == 0 && u->read_index <= 0) Using == 0 with write_index and <= 0 with read_index looks odd. read_index isn't any more suspectible to be negative than write_index (both have unsigned type anyway). > + do_write = 2; > + > + if (pollfd && (pollfd->revents & POLLIN)) { > + int n_read; > + > + n_read = a2dp_process_push(u); > + > + if (n_read < 0) > + goto io_fail; > + > + /* We just read something, so we are supposed to write something, too */ > + pending_read_bytes += n_read; > + do_write += pending_read_bytes / u->write_block_size; > + pending_read_bytes = pending_read_bytes % u->write_block_size; This is not relevant for A2DP. Although this doesn't actively do any harm in the A2DP mode either, for readability I think it would be a good idea to run this code only when it's actually useful. > + } > + } > + > + if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { > + > + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) > + pa_sink_process_rewind(u->sink, 0); > + > + if (pollfd) { > + if (pollfd->revents & POLLOUT) > + writable = true; > + > + if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { > + pa_usec_t time_passed; > + pa_usec_t audio_sent; > + > + /* Hmm, there is no input stream we could synchronize > + * to. So let's do things by time */ > + > + time_passed = pa_rtclock_now() - u->started_at; u->started_at is 0 in the beginning, and initialized to a sensible value only at the time of the first write attempt (BTW, I think it should be initialized only if the write actually succeeds). So in the first round time_passed will be very large, when it should be zero. This doesn't cause practical problems, because skipping is triggered only if write_index is greater than zero, but I think the code would be clearer if time_passed would be assigned zero here. > + audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); > + > + if (audio_sent <= time_passed) { > + pa_usec_t audio_to_send = time_passed - audio_sent; > + > + /* Never try to catch up for more than 100ms */ > + if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { Here the "write_index > 0" check wouldn't be needed if time_passed would always have a sane value. > + pa_usec_t skip_usec; > + uint64_t skip_bytes; > + > + skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; > + skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec); > + > + if (skip_bytes > 0) { > + pa_memchunk tmp; > + > + pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", > + (unsigned long long) skip_usec, > + (unsigned long long) skip_bytes); > + > + pa_sink_render_full(u->sink, skip_bytes, &tmp); write_memchunk may contain buffered audio, which should be consumed first. I propose that the amount of audio to skip is saved in userdata, and handled within a2dp_process_render(), because fiddling with write_memchunk should ideally be done only in a2dp_process_render(). > + pa_memblock_unref(tmp.memblock); > + u->write_index += skip_bytes; > + > + if (u->profile == PROFILE_A2DP_SINK) > + a2dp_reduce_bitpool(u); > + } > + } > + > + do_write = 1; > + pending_read_bytes = 0; > + } > + } > + > + if (writable && do_write > 0) { > + int n_written; > + > + if (u->write_index <= 0) > + u->started_at = pa_rtclock_now(); > + > + if ((n_written = a2dp_process_render(u)) < 0) > + goto io_fail; > + > + if (n_written == 0) > + pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); > + > + do_write -= n_written; > + writable = false; > + } > + > + if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) { We should enable the timer only if we have already successfully written something, so this condition should be added: "&& u->write_index > 0". > + pa_usec_t sleep_for; > + pa_usec_t time_passed, next_write_at; > + > + if (writable) { > + /* Hmm, there is no input stream we could synchronize > + * to. So let's estimate when we need to wake up the latest */ > + time_passed = pa_rtclock_now() - u->started_at; > + next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); > + sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; > + /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ > + } else > + /* drop stream every 500 ms */ > + sleep_for = PA_USEC_PER_MSEC * 500; I don't think this works. I guess "drop stream" refers to the skipping functionality, but skipping is only done if the socket is writable. If the 500 ms timer fires, it means that the socket is not writable (otherwise we would have woken up due to a POLLOUT event). If dropping data every 500 ms is something that we want to do, the skipping code needs to be modified so that it is run also if the socket is not writable. Phew, this one took long. -- Tanu