On 11.02.2018 00:36, Raman Shyshniou wrote: > Add autosuspend=<bool> option. Enabling this option will make > pipe-source suspended when all writers closed fifo. Source will > be automatically unsuspended if any data will be written to pipe > and suspended again when last writer closed fifo. Can you add a sentence about the motivation to the commit message? Something like "Currently the pipe-source will remain running even if no writer is connected and therefore no data is produced. This patch adds the autosuspend=<bool> option to prevent this. ..." Otherwise only two small comments below. > --- > src/modules/module-pipe-source.c | 136 +++++++++++++++++++++++++++++++++++++-- > 1 file changed, 129 insertions(+), 7 deletions(-) > > diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c > index f8284c1..af7badc 100644 > --- a/src/modules/module-pipe-source.c > +++ b/src/modules/module-pipe-source.c > @@ -57,11 +57,24 @@ PA_MODULE_USAGE( > "format=<sample format> " > "rate=<sample rate> " > "channels=<number of channels> " > - "channel_map=<channel map>"); > + "channel_map=<channel map> " > + "autosuspend=<boolean>"); > > #define DEFAULT_FILE_NAME "/tmp/music.input" > #define DEFAULT_SOURCE_NAME "fifo_input" > > +struct pipe_source_msg { > + pa_msgobject parent; > +}; > + > +typedef struct pipe_source_msg pipe_source_msg; > +PA_DEFINE_PRIVATE_CLASS(pipe_source_msg, pa_msgobject); > + > +enum { > + PIPE_SOURCE_SUSPEND, > + PIPE_SOURCE_RESUME, > +}; > + > struct userdata { > pa_core *core; > pa_module *module; > @@ -71,7 +84,10 @@ struct userdata { > pa_thread_mq thread_mq; > pa_rtpoll *rtpoll; > > + pipe_source_msg *msg; > + > char *filename; > + int corkfd; > int fd; > > pa_memchunk memchunk; > @@ -87,9 +103,41 @@ static const char* const valid_modargs[] = { > "rate", > "channels", > "channel_map", > + "autosuspend", > NULL > }; > > +/* Called from main context */ > +static int pipe_source_process_msg( > + pa_msgobject *o, > + int code, > + void *data, > + int64_t offset, > + pa_memchunk *chunk) { > + > + struct userdata *u = (struct userdata *) data; > + > + pa_assert(u); > + > + switch (code) { > + case PIPE_SOURCE_SUSPEND: > + pa_log_debug("Suspending source %s because no writers left", u->source->name); > + pa_source_suspend(u->source, true, PA_SUSPEND_APPLICATION); > + break; > + > + case PIPE_SOURCE_RESUME: > + pa_log_debug("Resuming source %s because writer connected", u->source->name); > + pa_source_suspend(u->source, false, PA_SUSPEND_APPLICATION); > + break; > + > + default: > + pa_assert_not_reached(); > + } > + > + return 0; > +} > + > +/* Called from thread context */ > static int source_process_msg( > pa_msgobject *o, > int code, > @@ -135,8 +183,15 @@ static void thread_func(void *userdata) { > > pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); > > + /* Writer connected */ > + if (u->corkfd >= 0 && pollfd->revents & POLLIN) { > + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_RESUME, u, 0, NULL, NULL); > + pa_assert_se(pa_close(u->corkfd) == 0); > + u->corkfd = -1; > + } > + > /* Try to read some data and pass it on to the source driver */ > - if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) { > + if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents & POLLIN) { > ssize_t l; > void *p; > > @@ -151,8 +206,6 @@ static void thread_func(void *userdata) { > l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type); > pa_memblock_release(u->memchunk.memblock); > > - pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */ > - > if (l < 0) { > > if (errno == EINTR) > @@ -162,6 +215,11 @@ static void thread_func(void *userdata) { > goto fail; > } > > + } else if (l == 0) { > + > + /* Nothing to read */ > + pollfd->revents = 0; > + > } else { > > u->memchunk.length = (size_t) l; > @@ -178,7 +236,7 @@ static void thread_func(void *userdata) { > } > > /* Hmm, nothing to do. Let's sleep */ > - pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0); > + pollfd->events = (short) ((u->source->thread_info.state == PA_SOURCE_RUNNING || u->corkfd >= 0) ? POLLIN : 0); > > if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) > goto fail; > @@ -188,6 +246,29 @@ static void thread_func(void *userdata) { > > pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); > > + /* Last writer disconnected but data may still be in the buffer */ > + if (pollfd->revents & POLLHUP) { > + int l = 0; > + > +#ifdef FIONREAD > + ioctl(u->fd, FIONREAD, &l); > +#endif > + > + if (!l && u->corkfd < 0) { > + /* Suspend source */ > + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_SUSPEND, u, 0, NULL, NULL); > + > + /* Open fifo for writing to stop POLLHUP spam */ > + if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 0) { > + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); > + goto fail; > + } > + } > + > + /* Ignore POOLHUP anyway */ typo above. > + pollfd->revents &= ~POLLHUP; > + } > + > if (pollfd->revents & ~POLLIN) { > pa_log("FIFO shutdown."); > goto fail; > @@ -212,6 +293,7 @@ int pa__init(pa_module *m) { > pa_modargs *ma; > struct pollfd *pollfd; > pa_source_new_data data; > + bool autosuspend = false; > > pa_assert(m); > > @@ -238,17 +320,51 @@ int pa__init(pa_module *m) { > goto fail; > } > > + u->corkfd = -1; > + > + if (!(u->msg = pa_msgobject_new(pipe_source_msg))) > + goto fail; > + > + u->msg->parent.process_msg = pipe_source_process_msg; > + > u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); > > if (mkfifo(u->filename, 0666) < 0) { > pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); > goto fail; > } > - if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { > - pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); > + > + if (pa_modargs_get_value_boolean(ma, "autosuspend", &autosuspend) < 0) { > + pa_log("Failed to parse autosuspend= argument."); > goto fail; > } > > + if (autosuspend) { > + int rwfd; > + > + /* Open fifo for read and write first, so the next open for read-only > + will be succeded immediately */ > + if ((rwfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { > + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); > + goto fail; > + } > + > + if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) { > + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); > + pa_assert_se(pa_close(rwfd) == 0); > + goto fail; > + } > + You could #ifdef O_NONBLOCK and only open the FIFO for RDWR if the option does not exist. > + pa_assert_se(pa_close(rwfd) == 0); > + > + } else { > + /* Open fifo for read and write, so there will be at least one writer */ > + if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { > + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); > + goto fail; > + } > + } > + > pa_make_fd_nonblock(u->fd); > > if (fstat(u->fd, &st) < 0) { > @@ -360,6 +476,12 @@ void pa__done(pa_module *m) { > pa_xfree(u->filename); > } > > + if (u->msg) > + pa_xfree(u->msg); > + > + if (u->corkfd >= 0) > + pa_assert_se(pa_close(u->corkfd) == 0); > + > if (u->fd >= 0) > pa_assert_se(pa_close(u->fd) == 0); >