On 02/20/2018 07:02 PM, Georg Chini wrote: > On 20.02.2018 16:38, Raman Shyshniou wrote: >> Currently the pipe-source will remain running even if no >> writer is connected and therefore no data is produced. >> This patch adds the autosuspend=<bool> option to prevent this. >> Source will stay suspended if no writer is connected. >> This option is enabled by default. >> --- >> src/modules/module-pipe-source.c | 279 +++++++++++++++++++++++++++++---------- >> 1 file changed, 212 insertions(+), 67 deletions(-) >> >> diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c >> index f8284c1..359cdbf 100644 >> --- a/src/modules/module-pipe-source.c >> +++ b/src/modules/module-pipe-source.c >> @@ -33,6 +33,7 @@ >> #include <sys/filio.h> >> #endif >> +#include <pulse/rtclock.h> >> #include <pulse/xmalloc.h> >> #include <pulsecore/core-error.h> >> @@ -57,11 +58,24 @@ PA_MODULE_USAGE( >> "format=<sample format> " >> "rate=<sample rate> " >> "channels=<number of channels> " >> - "channel_map=<channel map>"); >> + "channel_map=<channel map> " >> + "autosuspend=<boolean>"); >> #define DEFAULT_FILE_NAME "/tmp/music.input" >> #define DEFAULT_SOURCE_NAME "fifo_input" >> +struct pipe_source_msg { >> + pa_msgobject parent; >> +}; >> + >> +typedef struct pipe_source_msg pipe_source_msg; >> +PA_DEFINE_PRIVATE_CLASS(pipe_source_msg, pa_msgobject); >> + >> +enum { >> + PIPE_SOURCE_SUSPEND, >> + PIPE_SOURCE_RESUME >> +}; >> + >> struct userdata { >> pa_core *core; >> pa_module *module; >> @@ -71,12 +85,14 @@ struct userdata { >> pa_thread_mq thread_mq; >> pa_rtpoll *rtpoll; >> + pipe_source_msg *msg; >> + pa_usec_t timestamp; >> + bool autosuspend; >> + size_t pipe_size; >> + >> char *filename; >> + int corkfd; >> int fd; >> - >> - pa_memchunk memchunk; >> - >> - pa_rtpoll_item *rtpoll_item; >> }; >> static const char* const valid_modargs[] = { >> @@ -87,9 +103,41 @@ static const char* const valid_modargs[] = { >> "rate", >> "channels", >> "channel_map", >> + "autosuspend", >> NULL >> }; >> +/* Called from main context */ >> +static int pipe_source_process_msg( >> + pa_msgobject *o, >> + int code, >> + void *data, >> + int64_t offset, >> + pa_memchunk *chunk) { >> + >> + struct userdata *u = (struct userdata *) data; >> + >> + pa_assert(u); >> + >> + switch (code) { >> + case PIPE_SOURCE_SUSPEND: >> + pa_log_debug("Suspending source %s because no writers left", u->source->name); >> + pa_source_suspend(u->source, true, PA_SUSPEND_APPLICATION); >> + break; >> + >> + case PIPE_SOURCE_RESUME: >> + pa_log_debug("Resuming source %s", u->source->name); >> + pa_source_suspend(u->source, false, PA_SUSPEND_APPLICATION); >> + break; >> + >> + default: >> + pa_assert_not_reached(); >> + } >> + >> + return 0; >> +} >> + >> +/* Called from thread context */ >> static int source_process_msg( >> pa_msgobject *o, >> int code, >> @@ -101,17 +149,19 @@ static int source_process_msg( >> switch (code) { >> - case PA_SOURCE_MESSAGE_GET_LATENCY: { >> - size_t n = 0; >> + case PA_SOURCE_MESSAGE_SET_STATE: >> -#ifdef FIONREAD >> - int l; >> + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED || u->source->thread_info.state == PA_SOURCE_INIT) { >> + if (PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(data))) >> + u->timestamp = pa_rtclock_now(); >> + } >> - if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) >> - n = (size_t) l; >> -#endif >> + break; >> + >> + case PA_SOURCE_MESSAGE_GET_LATENCY: { >> + >> + *((int64_t*) data) = PA_CLIP_SUB((int64_t)pa_rtclock_now(), (int64_t)u->timestamp); >> - *((int64_t*) data) = pa_bytes_to_usec(n, &u->source->sample_spec); >> return 0; >> } >> } >> @@ -121,7 +171,11 @@ static int source_process_msg( >> static void thread_func(void *userdata) { >> struct userdata *u = userdata; >> + pa_rtpoll_item *rtpoll_item; >> + struct pollfd *pollfd; >> + pa_memchunk chunk; >> int read_type = 0; >> + size_t fs; >> pa_assert(u); >> @@ -129,68 +183,140 @@ static void thread_func(void *userdata) { >> pa_thread_mq_install(&u->thread_mq); >> + fs = pa_frame_size(&u->source->sample_spec); >> + >> + pa_memchunk_reset(&chunk); >> + chunk.memblock = pa_memblock_new(u->core->mempool, u->pipe_size); >> + >> + rtpoll_item = NULL; >> + pollfd = NULL; >> + >> + u->timestamp = pa_rtclock_now(); >> + >> + /* Close our writer here to suspend source if no writers left */ >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + u->corkfd = -1; >> + >> for (;;) { >> int ret; >> - struct pollfd *pollfd; >> - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> + if (chunk.length) { >> + /* We have a pending data, let's stop polling pipe. >> + * Setting up pollfd->events = 0 is not enough to stop >> + * POLLHUP spam if all writers are closed pipe. >> + * We need to stop polling pipe completely */ >> + if (rtpoll_item) { >> + pa_rtpoll_item_free(rtpoll_item); >> + rtpoll_item = NULL; >> + } >> + } else { >> + /* We have no pending data, let's start polling pipe */ >> + if (rtpoll_item == NULL) { >> + rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); >> + pollfd = pa_rtpoll_item_get_pollfd(rtpoll_item, NULL); >> + pollfd->events = POLLIN; >> + pollfd->fd = u->fd; >> + } >> + } > > Your code will allocate/deallocate the rtpoll_item on each > iteration. This is unnecessary and CPU intensive (I was told). > I would still prefer my approach and I only see disadvantages > with your way. > No, it's not like that. rtpoll_item will be allocated: - On thread start - We just processed all pending data, i.e. source was just resumed. rtpoll_item will be freed: - We were got any data from pipe, but source was just suspended. So there will be only one free per suspend and one allocate per resume. >> - /* Try to read some data and pass it on to the source driver */ >> - if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) { >> - ssize_t l; >> - void *p; >> + if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) >> + goto fail; >> - if (!u->memchunk.memblock) { >> - u->memchunk.memblock = pa_memblock_new(u->core->mempool, pa_pipe_buf(u->fd)); >> - u->memchunk.index = u->memchunk.length = 0; >> - } >> + if (ret == 0) >> + goto finish; >> - pa_assert(pa_memblock_get_length(u->memchunk.memblock) > u->memchunk.index); >> + if (rtpoll_item) { >> + pollfd = pa_rtpoll_item_get_pollfd(rtpoll_item, NULL); >> - p = pa_memblock_acquire(u->memchunk.memblock); >> - l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type); >> - pa_memblock_release(u->memchunk.memblock); >> + if (pollfd->revents & ~(POLLIN|POLLHUP)) { >> + pa_log("FD error: %s%s%s%s", >> + pollfd->revents & POLLOUT ? "POLLOUT " : "", >> + pollfd->revents & POLLERR ? "POLLERR " : "", >> + pollfd->revents & POLLPRI ? "POLLPRI " : "", >> + pollfd->revents & POLLNVAL ? "POLLNVAL " : ""); >> - pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */ >> + goto fail; >> + } >> + } else >> + pollfd = NULL; >> - if (l < 0) { >> + /* Try to read some data if there are any events */ >> + if (pollfd && pollfd->revents) { >> + ssize_t l; >> + void *p; >> - if (errno == EINTR) >> - continue; >> - else if (errno != EAGAIN) { >> - pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno)); >> - goto fail; >> - } >> + pa_assert(chunk.index < fs); >> - } else { >> + p = pa_memblock_acquire(chunk.memblock); >> + l = pa_read(u->fd, (uint8_t *) p + chunk.index, pa_memblock_get_length(chunk.memblock) - chunk.index, &read_type); >> + pa_memblock_release(chunk.memblock); >> - u->memchunk.length = (size_t) l; >> - pa_source_post(u->source, &u->memchunk); >> - u->memchunk.index += (size_t) l; >> + if (l > 0) { >> + chunk.length = (size_t) l; >> + u->timestamp = pa_rtclock_now(); >> - if (u->memchunk.index >= pa_memblock_get_length(u->memchunk.memblock)) { >> - pa_memblock_unref(u->memchunk.memblock); >> - pa_memchunk_reset(&u->memchunk); >> + if (u->corkfd >= 0) { >> + if (u->autosuspend) >> + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_RESUME, u, 0, NULL, NULL); >> + >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + u->corkfd = -1; >> + } >> + } else if (l == 0) { >> + /* Writer was disconnected: discard unalligned data tail */ >> + chunk.index = 0; >> + >> + if (u->corkfd < 0) { >> + if (u->autosuspend) >> + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_SUSPEND, u, 0, NULL, NULL); >> + >> + if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> } >> + } else { >> + if (errno == EINTR || errno == EAGAIN) >> + continue; >> - pollfd->revents = 0; >> + pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno)); >> + goto fail; >> } >> } >> - /* Hmm, nothing to do. Let's sleep */ >> - pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0); >> + /* Post pending data. >> + * Let's keep frame boundaries */ >> + if (PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { > > Just checking if the source is open before you post the data is also not > the right thing to do. When you are resuming from autosuspend, you > send a message to the main thread. You do not know if this message > has already been processed when you reach this part of the code. When I send the message to the main thread I expect the u->source->thread_info.state will be changed only after source_process_msg() will be called, i.e. after at least one pa_rtpoll_run() will be called. That's why I wait for source to be resumed removing fd from polling. > >> + size_t total_len = chunk.index + chunk.length; >> + size_t frames = total_len / fs; >> + size_t tail = total_len % fs; >> - if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) >> - goto fail; >> + if (frames > 0) { >> + pa_memblock *memblock; >> - if (ret == 0) >> - goto finish; >> + memblock = pa_memblock_new(u->core->mempool, u->pipe_size); >> + chunk.length = frames * fs; >> + chunk.index = 0; >> - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> + if (tail) { >> + void *src, *dst; >> - if (pollfd->revents & ~POLLIN) { >> - pa_log("FIFO shutdown."); >> - goto fail; >> + dst = pa_memblock_acquire(memblock); >> + src = pa_memblock_acquire(chunk.memblock); >> + >> + memcpy(dst, (uint8_t *) src + chunk.length, tail); >> + >> + pa_memblock_release(chunk.memblock); >> + pa_memblock_release(memblock); >> + } >> + >> + pa_source_post(u->source, &chunk); >> + pa_memblock_unref(chunk.memblock); >> + chunk.memblock = memblock; >> + } >> + >> + chunk.index = tail; >> + chunk.length = 0; >> } >> } >> @@ -201,6 +327,11 @@ fail: >> pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); >> finish: >> + if (rtpoll_item) >> + pa_rtpoll_item_free(rtpoll_item); >> + >> + pa_memblock_unref(chunk.memblock); >> + >> pa_log_debug("Thread shutting down"); >> } >> @@ -210,7 +341,6 @@ int pa__init(pa_module *m) { >> pa_sample_spec ss; >> pa_channel_map map; >> pa_modargs *ma; >> - struct pollfd *pollfd; >> pa_source_new_data data; >> pa_assert(m); >> @@ -230,7 +360,6 @@ int pa__init(pa_module *m) { >> m->userdata = u = pa_xnew0(struct userdata, 1); >> u->core = m->core; >> u->module = m; >> - pa_memchunk_reset(&u->memchunk); >> u->rtpoll = pa_rtpoll_new(); >> if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) { >> @@ -238,13 +367,31 @@ int pa__init(pa_module *m) { >> goto fail; >> } >> + if (!(u->msg = pa_msgobject_new(pipe_source_msg))) >> + goto fail; >> + >> + u->msg->parent.process_msg = pipe_source_process_msg; >> + >> + u->autosuspend = true; >> + >> + if (pa_modargs_get_value_boolean(ma, "autosuspend", &u->autosuspend) < 0) { >> + pa_log("Failed to parse autosuspend argument."); >> + goto fail; >> + } >> + >> u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); >> if (mkfifo(u->filename, 0666) < 0) { >> pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); >> goto fail; >> } >> - if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + >> + if ((u->corkfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> + >> + if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) { >> pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> goto fail; >> } >> @@ -289,12 +436,10 @@ int pa__init(pa_module *m) { >> pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); >> pa_source_set_rtpoll(u->source, u->rtpoll); >> - pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(pa_pipe_buf(u->fd), &u->source->sample_spec)); >> - u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); >> - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> - pollfd->fd = u->fd; >> - pollfd->events = pollfd->revents = 0; >> + u->pipe_size = pa_pipe_buf(u->fd); >> + u->pipe_size = PA_MIN(pa_mempool_block_size_max(u->core->mempool), u->pipe_size); >> + pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(u->pipe_size, &u->source->sample_spec)); >> if (!(u->thread = pa_thread_new("pipe-source", thread_func, u))) { >> pa_log("Failed to create thread."); >> @@ -346,12 +491,6 @@ void pa__done(pa_module *m) { >> if (u->source) >> pa_source_unref(u->source); >> - if (u->memchunk.memblock) >> - pa_memblock_unref(u->memchunk.memblock); >> - >> - if (u->rtpoll_item) >> - pa_rtpoll_item_free(u->rtpoll_item); >> - >> if (u->rtpoll) >> pa_rtpoll_free(u->rtpoll); >> @@ -360,6 +499,12 @@ void pa__done(pa_module *m) { >> pa_xfree(u->filename); >> } >> + if (u->msg) >> + pa_xfree(u->msg); >> + >> + if (u->corkfd >= 0) >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + >> if (u->fd >= 0) >> pa_assert_se(pa_close(u->fd) == 0); >> > > -- Raman