On 02/13/2018 06:37 PM, Georg Chini wrote: > On 11.02.2018 00:36, Raman Shyshniou wrote: >> Add autosuspend=<bool> option. Enabling this option will make >> pipe-source suspended when all writers closed fifo. Source will >> be automatically unsuspended if any data will be written to pipe >> and suspended again when last writer closed fifo. > Can you add a sentence about the motivation to the commit > message? Something like "Currently the pipe-source will remain > running even if no writer is connected and therefore no data is > produced. This patch adds the autosuspend=<bool> option to > prevent this. ..." > Ok. > Otherwise only two small comments below. >> --- >> src/modules/module-pipe-source.c | 136 +++++++++++++++++++++++++++++++++++++-- >> 1 file changed, 129 insertions(+), 7 deletions(-) >> >> diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c >> index f8284c1..af7badc 100644 >> --- a/src/modules/module-pipe-source.c >> +++ b/src/modules/module-pipe-source.c >> @@ -57,11 +57,24 @@ PA_MODULE_USAGE( >> "format=<sample format> " >> "rate=<sample rate> " >> "channels=<number of channels> " >> - "channel_map=<channel map>"); >> + "channel_map=<channel map> " >> + "autosuspend=<boolean>"); >> #define DEFAULT_FILE_NAME "/tmp/music.input" >> #define DEFAULT_SOURCE_NAME "fifo_input" >> +struct pipe_source_msg { >> + pa_msgobject parent; >> +}; >> + >> +typedef struct pipe_source_msg pipe_source_msg; >> +PA_DEFINE_PRIVATE_CLASS(pipe_source_msg, pa_msgobject); >> + >> +enum { >> + PIPE_SOURCE_SUSPEND, >> + PIPE_SOURCE_RESUME, >> +}; >> + >> struct userdata { >> pa_core *core; >> pa_module *module; >> @@ -71,7 +84,10 @@ struct userdata { >> pa_thread_mq thread_mq; >> pa_rtpoll *rtpoll; >> + pipe_source_msg *msg; >> + >> char *filename; >> + int corkfd; >> int fd; >> pa_memchunk memchunk; >> @@ -87,9 +103,41 @@ static const char* const valid_modargs[] = { >> "rate", >> "channels", >> "channel_map", >> + "autosuspend", >> NULL >> }; >> +/* Called from main context */ >> +static int pipe_source_process_msg( >> + pa_msgobject *o, >> + int code, >> + void *data, >> + int64_t offset, >> + pa_memchunk *chunk) { >> + >> + struct userdata *u = (struct userdata *) data; >> + >> + pa_assert(u); >> + >> + switch (code) { >> + case PIPE_SOURCE_SUSPEND: >> + pa_log_debug("Suspending source %s because no writers left", u->source->name); >> + pa_source_suspend(u->source, true, PA_SUSPEND_APPLICATION); >> + break; >> + >> + case PIPE_SOURCE_RESUME: >> + pa_log_debug("Resuming source %s because writer connected", u->source->name); >> + pa_source_suspend(u->source, false, PA_SUSPEND_APPLICATION); >> + break; >> + >> + default: >> + pa_assert_not_reached(); >> + } >> + >> + return 0; >> +} >> + >> +/* Called from thread context */ >> static int source_process_msg( >> pa_msgobject *o, >> int code, >> @@ -135,8 +183,15 @@ static void thread_func(void *userdata) { >> pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> + /* Writer connected */ >> + if (u->corkfd >= 0 && pollfd->revents & POLLIN) { >> + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_RESUME, u, 0, NULL, NULL); >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + u->corkfd = -1; >> + } >> + >> /* Try to read some data and pass it on to the source driver */ >> - if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) { >> + if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents & POLLIN) { >> ssize_t l; >> void *p; >> @@ -151,8 +206,6 @@ static void thread_func(void *userdata) { >> l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type); >> pa_memblock_release(u->memchunk.memblock); >> - pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */ >> - >> if (l < 0) { >> if (errno == EINTR) >> @@ -162,6 +215,11 @@ static void thread_func(void *userdata) { >> goto fail; >> } >> + } else if (l == 0) { >> + >> + /* Nothing to read */ >> + pollfd->revents = 0; >> + >> } else { >> u->memchunk.length = (size_t) l; >> @@ -178,7 +236,7 @@ static void thread_func(void *userdata) { >> } >> /* Hmm, nothing to do. Let's sleep */ >> - pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0); >> + pollfd->events = (short) ((u->source->thread_info.state == PA_SOURCE_RUNNING || u->corkfd >= 0) ? POLLIN : 0); >> if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) >> goto fail; >> @@ -188,6 +246,29 @@ static void thread_func(void *userdata) { >> pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> + /* Last writer disconnected but data may still be in the buffer */ >> + if (pollfd->revents & POLLHUP) { >> + int l = 0; >> + >> +#ifdef FIONREAD >> + ioctl(u->fd, FIONREAD, &l); >> +#endif >> + >> + if (!l && u->corkfd < 0) { >> + /* Suspend source */ >> + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_SUSPEND, u, 0, NULL, NULL); >> + >> + /* Open fifo for writing to stop POLLHUP spam */ >> + if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> + } >> + >> + /* Ignore POOLHUP anyway */ > > typo above. > >> + pollfd->revents &= ~POLLHUP; >> + } >> + >> if (pollfd->revents & ~POLLIN) { >> pa_log("FIFO shutdown."); >> goto fail; >> @@ -212,6 +293,7 @@ int pa__init(pa_module *m) { >> pa_modargs *ma; >> struct pollfd *pollfd; >> pa_source_new_data data; >> + bool autosuspend = false; >> pa_assert(m); >> @@ -238,17 +320,51 @@ int pa__init(pa_module *m) { >> goto fail; >> } >> + u->corkfd = -1; >> + >> + if (!(u->msg = pa_msgobject_new(pipe_source_msg))) >> + goto fail; >> + >> + u->msg->parent.process_msg = pipe_source_process_msg; >> + >> u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); >> if (mkfifo(u->filename, 0666) < 0) { >> pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); >> goto fail; >> } >> - if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> - pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + >> + if (pa_modargs_get_value_boolean(ma, "autosuspend", &autosuspend) < 0) { >> + pa_log("Failed to parse autosuspend= argument."); >> goto fail; >> } >> + if (autosuspend) { >> + int rwfd; >> + >> + /* Open fifo for read and write first, so the next open for read-only >> + will be succeded immediately */ >> + if ((rwfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> + >> + if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + pa_assert_se(pa_close(rwfd) == 0); >> + goto fail; >> + } >> + > > You could #ifdef O_NONBLOCK and only open the FIFO for RDWR if the > option does not exist. > Agreed. I thought about SOCK_NONBLOCK which is supported since linux 2.6.27, but O_NONBLOCK for open(2) was always supported. >> + pa_assert_se(pa_close(rwfd) == 0); >> + >> + } else { >> + /* Open fifo for read and write, so there will be at least one writer */ >> + if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> + } >> + >> pa_make_fd_nonblock(u->fd); >> if (fstat(u->fd, &st) < 0) { >> @@ -360,6 +476,12 @@ void pa__done(pa_module *m) { >> pa_xfree(u->filename); >> } >> + if (u->msg) >> + pa_xfree(u->msg); >> + >> + if (u->corkfd >= 0) >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + >> if (u->fd >= 0) >> pa_assert_se(pa_close(u->fd) == 0); >> > >