Currently the pipe-source does not produce any data if no writer is connected. This patch enable silence generator when last writer closed pipe. It will stop automatically when any data appears. --- src/modules/module-pipe-source.c | 112 ++++++++++++++++++++++++++++++--------- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index f8284c1..d5e6637 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -33,6 +33,7 @@ #include <sys/filio.h> #endif +#include <pulse/rtclock.h> #include <pulse/xmalloc.h> #include <pulsecore/core-error.h> @@ -71,7 +72,11 @@ struct userdata { pa_thread_mq thread_mq; pa_rtpoll *rtpoll; + pa_usec_t timestamp; + pa_usec_t latency; + char *filename; + int corkfd; int fd; pa_memchunk memchunk; @@ -90,6 +95,7 @@ static const char* const valid_modargs[] = { NULL }; +/* Called from thread context */ static int source_process_msg( pa_msgobject *o, int code, @@ -101,17 +107,28 @@ static int source_process_msg( switch (code) { + case PA_SOURCE_MESSAGE_SET_STATE: + + if (u->corkfd >= 0 && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(data))) + u->timestamp = pa_rtclock_now(); + + break; + case PA_SOURCE_MESSAGE_GET_LATENCY: { - size_t n = 0; + int64_t latency = 0; + if (u->corkfd >= 0) + latency = (int64_t)u->timestamp - (int64_t)pa_rtclock_now(); #ifdef FIONREAD - int l; + else { + int l; - if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) - n = (size_t) l; + if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) + latency = pa_bytes_to_usec((size_t) l, &u->source->sample_spec); + } #endif - *((int64_t*) data) = pa_bytes_to_usec(n, &u->source->sample_spec); + *((int64_t*) data) = latency; return 0; } } @@ -129,6 +146,10 @@ static void thread_func(void *userdata) { pa_thread_mq_install(&u->thread_mq); + /* Close our writer here to start silence generation or suspend source if no writers left */ + pa_assert_se(pa_close(u->corkfd) == 0); + u->corkfd = -1; + for (;;) { int ret; struct pollfd *pollfd; @@ -136,7 +157,7 @@ static void thread_func(void *userdata) { pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); /* Try to read some data and pass it on to the source driver */ - if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) { + if (pollfd->revents) { ssize_t l; void *p; @@ -151,47 +172,77 @@ static void thread_func(void *userdata) { l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type); pa_memblock_release(u->memchunk.memblock); - pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */ + if (PA_LIKELY(l > 0)) { + if (PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { + u->memchunk.length = (size_t) l; + pa_source_post(u->source, &u->memchunk); + u->memchunk.index += (size_t) l; + + if (u->memchunk.index >= pa_memblock_get_length(u->memchunk.memblock)) { + pa_memblock_unref(u->memchunk.memblock); + pa_memchunk_reset(&u->memchunk); + } + } - if (l < 0) { + if (u->corkfd >= 0) { + pa_assert_se(pa_close(u->corkfd) == 0); + u->corkfd = -1; + pa_rtpoll_set_timer_disabled(u->rtpoll); + } + } else if (l == 0) { + if (u->corkfd < 0) { + pa_log_debug("There are no writers left"); + + if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 0) { + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); + goto fail; + } + + u->latency = pa_bytes_to_usec(pa_pipe_buf(u->fd), &u->source->sample_spec); + u->timestamp = pa_rtclock_now(); + } + } else { if (errno == EINTR) continue; else if (errno != EAGAIN) { pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno)); goto fail; } + } + } - } else { + if (u->corkfd >= 0 && PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { + pa_usec_t now; + size_t l; - u->memchunk.length = (size_t) l; - pa_source_post(u->source, &u->memchunk); - u->memchunk.index += (size_t) l; + now = pa_rtclock_now(); + l = pa_usec_to_bytes(now - u->timestamp, &u->source->sample_spec); - if (u->memchunk.index >= pa_memblock_get_length(u->memchunk.memblock)) { - pa_memblock_unref(u->memchunk.memblock); - pa_memchunk_reset(&u->memchunk); - } + if (l > 0) { + pa_memchunk chunk = { + .index = 0, + .length = l, + .memblock = pa_memblock_new(u->core->mempool, l) + }; - pollfd->revents = 0; + pa_source_post(u->source, &chunk); + pa_memblock_unref(chunk.memblock); + + u->timestamp += pa_bytes_to_usec(l, &u->source->sample_spec); } + + pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp + u->latency); } /* Hmm, nothing to do. Let's sleep */ - pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0); + pollfd->events = POLLIN; if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) goto fail; if (ret == 0) goto finish; - - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); - - if (pollfd->revents & ~POLLIN) { - pa_log("FIFO shutdown."); - goto fail; - } } fail: @@ -244,7 +295,13 @@ int pa__init(pa_module *m) { pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); goto fail; } - if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { + + if ((u->corkfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); + goto fail; + } + + if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) { pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); goto fail; } @@ -360,6 +417,9 @@ void pa__done(pa_module *m) { pa_xfree(u->filename); } + if (u->corkfd >= 0) + pa_assert_se(pa_close(u->corkfd) == 0); + if (u->fd >= 0) pa_assert_se(pa_close(u->fd) == 0); -- 1.8.3.1