Make pipe-source suspended if all writers closed fifo. Source will be automatically unsuspended if any data will be written to pipe and suspended again when last writer closed fifo. --- src/modules/module-pipe-source.c | 114 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 5 deletions(-) diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index f8284c1..927fa89 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -62,6 +62,18 @@ PA_MODULE_USAGE( #define DEFAULT_FILE_NAME "/tmp/music.input" #define DEFAULT_SOURCE_NAME "fifo_input" +struct pipe_source_msg { + pa_msgobject parent; +}; + +typedef struct pipe_source_msg pipe_source_msg; +PA_DEFINE_PRIVATE_CLASS(pipe_source_msg, pa_msgobject); + +enum { + PIPE_SOURCE_SUSPEND, + PIPE_SOURCE_RESUME, +}; + struct userdata { pa_core *core; pa_module *module; @@ -71,7 +83,10 @@ struct userdata { pa_thread_mq thread_mq; pa_rtpoll *rtpoll; + pipe_source_msg *msg; + char *filename; + int corkfd; int fd; pa_memchunk memchunk; @@ -90,6 +105,37 @@ static const char* const valid_modargs[] = { NULL }; +/* Called from main context */ +static int pipe_source_process_msg( + pa_msgobject *o, + int code, + void *data, + int64_t offset, + pa_memchunk *chunk) { + + struct userdata *u = (struct userdata *) data; + + pa_assert(u); + + switch (code) { + case PIPE_SOURCE_SUSPEND: + pa_log_debug("Suspending source %s because no writers left", u->source->name); + pa_source_suspend(u->source, true, PA_SUSPEND_APPLICATION); + break; + + case PIPE_SOURCE_RESUME: + pa_log_debug("Resuming source %s because writer connected", u->source->name); + pa_source_suspend(u->source, false, PA_SUSPEND_APPLICATION); + break; + + default: + pa_assert_not_reached(); + } + + return 0; +} + +/* Called from thread context */ static int source_process_msg( pa_msgobject *o, int code, @@ -135,8 +181,15 @@ static void thread_func(void *userdata) { pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + /* Writer connected */ + if (u->corkfd >= 0 && pollfd->revents & POLLIN) { + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_RESUME, u, 0, NULL, NULL); + pa_assert_se(pa_close(u->corkfd) == 0); + u->corkfd = -1; + } + /* Try to read some data and pass it on to the source driver */ - if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) { + if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents & POLLIN) { ssize_t l; void *p; @@ -151,8 +204,6 @@ static void thread_func(void *userdata) { l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type); pa_memblock_release(u->memchunk.memblock); - pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */ - if (l < 0) { if (errno == EINTR) @@ -162,6 +213,11 @@ static void thread_func(void *userdata) { goto fail; } + } else if (l == 0) { + + /* Nothing to read */ + pollfd->revents = 0; + } else { u->memchunk.length = (size_t) l; @@ -178,7 +234,7 @@ static void thread_func(void *userdata) { } /* Hmm, nothing to do. Let's sleep */ - pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0); + pollfd->events = (short) ((u->source->thread_info.state == PA_SOURCE_RUNNING || u->corkfd >= 0) ? POLLIN : 0); if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) goto fail; @@ -188,6 +244,29 @@ static void thread_func(void *userdata) { pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + /* Last writer disconnected but data may still be in the buffer */ + if (pollfd->revents & POLLHUP) { + int l = 0; + +#ifdef FIONREAD + ioctl(u->fd, FIONREAD, &l); +#endif + + if (!l && u->corkfd < 0) { + /* Suspend source */ + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_SUSPEND, u, 0, NULL, NULL); + + /* Open fifo for writing to stop POLLHUP spam */ + if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 0) { + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); + goto fail; + } + } + + /* Ignore POOLHUP anyway */ + pollfd->revents &= ~POLLHUP; + } + if (pollfd->revents & ~POLLIN) { pa_log("FIFO shutdown."); goto fail; @@ -212,6 +291,7 @@ int pa__init(pa_module *m) { pa_modargs *ma; struct pollfd *pollfd; pa_source_new_data data; + int rwfd; pa_assert(m); @@ -238,17 +318,35 @@ int pa__init(pa_module *m) { goto fail; } + u->corkfd = -1; + + if (!(u->msg = pa_msgobject_new(pipe_source_msg))) + goto fail; + + u->msg->parent.process_msg = pipe_source_process_msg; + u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); if (mkfifo(u->filename, 0666) < 0) { pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); goto fail; } - if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { + + /* Open fifo for read and write first, so the next open for read-only + will be succeded immediately */ + if ((rwfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); + goto fail; + } + + if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) { pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); + pa_assert_se(pa_close(rwfd) == 0); goto fail; } + pa_assert_se(pa_close(rwfd) == 0); + pa_make_fd_nonblock(u->fd); if (fstat(u->fd, &st) < 0) { @@ -360,6 +458,12 @@ void pa__done(pa_module *m) { pa_xfree(u->filename); } + if (u->msg) + pa_xfree(u->msg); + + if (u->corkfd >= 0) + pa_assert_se(pa_close(u->corkfd) == 0); + if (u->fd >= 0) pa_assert_se(pa_close(u->fd) == 0); -- 1.8.3.1