On 02/20/2018 03:11 PM, Georg Chini wrote: > On 19.02.2018 16:01, Raman Shyshniou wrote: >> Currently the pipe-source will remain running even if no >> writer is connected and therefore no data is produced. >> This patch adds the autosuspend=<bool> option to prevent this. >> Source will stay suspended if no writer is connected. >> This option is enabled by default. >> --- >> src/modules/module-pipe-source.c | 279 ++++++++++++++++++++++++++++++--------- >> 1 file changed, 215 insertions(+), 64 deletions(-) >> >> diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c >> index f8284c1..c1a1e9c 100644 >> --- a/src/modules/module-pipe-source.c >> +++ b/src/modules/module-pipe-source.c >> @@ -33,6 +33,7 @@ >> #include <sys/filio.h> >> #endif >> +#include <pulse/rtclock.h> >> #include <pulse/xmalloc.h> >> #include <pulsecore/core-error.h> >> @@ -57,11 +58,24 @@ PA_MODULE_USAGE( >> "format=<sample format> " >> "rate=<sample rate> " >> "channels=<number of channels> " >> - "channel_map=<channel map>"); >> + "channel_map=<channel map> " >> + "autosuspend=<boolean>"); >> #define DEFAULT_FILE_NAME "/tmp/music.input" >> #define DEFAULT_SOURCE_NAME "fifo_input" >> +struct pipe_source_msg { >> + pa_msgobject parent; >> +}; >> + >> +typedef struct pipe_source_msg pipe_source_msg; >> +PA_DEFINE_PRIVATE_CLASS(pipe_source_msg, pa_msgobject); >> + >> +enum { >> + PIPE_SOURCE_SUSPEND, >> + PIPE_SOURCE_RESUME >> +}; >> + >> struct userdata { >> pa_core *core; >> pa_module *module; >> @@ -71,12 +85,14 @@ struct userdata { >> pa_thread_mq thread_mq; >> pa_rtpoll *rtpoll; >> + pipe_source_msg *msg; >> + pa_usec_t timestamp; >> + bool autosuspend; >> + size_t pipe_size; >> + >> char *filename; >> + int corkfd; >> int fd; >> - >> - pa_memchunk memchunk; >> - >> - pa_rtpoll_item *rtpoll_item; >> }; >> static const char* const valid_modargs[] = { >> @@ -87,9 +103,41 @@ static const char* const valid_modargs[] = { >> "rate", >> "channels", >> "channel_map", >> + "autosuspend", >> NULL >> }; >> +/* Called from main context */ >> +static int pipe_source_process_msg( >> + pa_msgobject *o, >> + int code, >> + void *data, >> + int64_t offset, >> + pa_memchunk *chunk) { >> + >> + struct userdata *u = (struct userdata *) data; >> + >> + pa_assert(u); >> + >> + switch (code) { >> + case PIPE_SOURCE_SUSPEND: >> + pa_log_debug("Suspending source %s because no writers left", u->source->name); >> + pa_source_suspend(u->source, true, PA_SUSPEND_APPLICATION); >> + break; >> + >> + case PIPE_SOURCE_RESUME: >> + pa_log_debug("Resuming source %s", u->source->name); >> + pa_source_suspend(u->source, false, PA_SUSPEND_APPLICATION); >> + break; >> + >> + default: >> + pa_assert_not_reached(); >> + } >> + >> + return 0; >> +} >> + >> +/* Called from thread context */ >> static int source_process_msg( >> pa_msgobject *o, >> int code, >> @@ -101,17 +149,30 @@ static int source_process_msg( >> switch (code) { >> + case PA_SOURCE_MESSAGE_SET_STATE: >> + >> + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED || u->source->thread_info.state == PA_SOURCE_INIT) { >> + if (PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(data))) >> + u->timestamp = pa_rtclock_now(); >> + } >> + >> + break; >> + >> case PA_SOURCE_MESSAGE_GET_LATENCY: { >> - size_t n = 0; >> + int64_t latency; >> + >> + latency = PA_CLIP_SUB((int64_t)pa_rtclock_now(), (int64_t)u->timestamp); >> #ifdef FIONREAD >> - int l; >> + if (u->corkfd < 0) { >> + int l; >> - if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) >> - n = (size_t) l; >> + if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) >> + latency += pa_bytes_to_usec((uint64_t)l, &u->source->sample_spec); >> + } >> #endif >> > > I don't think it is correct to add the data in the pipe to the latency. > Essentially, the pipe should always be empty, because you read > all data as soon as it is available. If a block of data is written to the > pipe, you should already have accounted for that block through > the time difference between now and the time stamp. Yes. You are right. I'll drop this code. > >> - *((int64_t*) data) = pa_bytes_to_usec(n, &u->source->sample_spec); >> + *((int64_t*) data) = latency; >> return 0; >> } >> } >> @@ -121,7 +182,11 @@ static int source_process_msg( >> static void thread_func(void *userdata) { >> struct userdata *u = userdata; >> + pa_rtpoll_item *rtpoll_item; >> + struct pollfd *pollfd; >> + pa_memchunk chunk; >> int read_type = 0; >> + size_t fs; >> pa_assert(u); >> @@ -129,68 +194,135 @@ static void thread_func(void *userdata) { >> pa_thread_mq_install(&u->thread_mq); >> + fs = pa_frame_size(&u->source->sample_spec); >> + >> + pa_memchunk_reset(&chunk); >> + chunk.memblock = pa_memblock_new(u->core->mempool, u->pipe_size); > > Further down, you might put some data in the memchunk before the > read if the previous data was not frame aligned, therefore the memblock > size must be u->pipe_size + fs. > >> + >> + rtpoll_item = NULL; >> + pollfd = NULL; >> + >> + u->timestamp = pa_rtclock_now(); >> + >> + /* Close our writer here to suspend source if no writers left */ >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + u->corkfd = -1; >> + >> for (;;) { >> int ret; >> - struct pollfd *pollfd; >> - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> + if (chunk.length) { >> + /* We have a pending data, let's stop polling pipe. >> + * Setting up pollfd->events = 0 is not enough to stop >> + * POLLHUP spam if all writers are closed pipe. >> + * We need to stop polling pipe completely */ >> + if (rtpoll_item) { >> + pa_rtpoll_item_free(rtpoll_item); >> + rtpoll_item = NULL; >> + } >> + } else { >> + /* We have no pending data, let's start polling pipe */ >> + if (rtpoll_item == NULL) { >> + rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); >> + pollfd = pa_rtpoll_item_get_pollfd(rtpoll_item, NULL); >> + pollfd->events = POLLIN; >> + pollfd->fd = u->fd; >> + } >> + } > > Why do you need to do that? As in your previous patches you > open the pipe for writing if all writers are disconnected, which > should stop POLLHUP's. This patch tries to be platform-independent. Unfortunately not all platforms set POLLHUP when a writer closed pipe. Moreover, when POLLHUP was set there may be data in pipe that must be read before suspend. Linux sets POLLIN in that case. Freebsd and macos sets POLLIN|POLLHUP even if no any data in pipe, but no writers left. Some platforms do not set POLLHUP at all. So do read() and check it return 0 must be the only condition to suspend source and open pipe for writing. Also it cover cases when source suspended by user - we read data from pipe, send resume, but source stays suspended. We must stop polling pipe, but pollfd->events = 0 is not enough. If the writer make a open-write-close while source stays suspended we will get endless POLLHUPs > >> - /* Try to read some data and pass it on to the source driver */ >> - if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) { >> - ssize_t l; >> - void *p; >> + if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) >> + goto fail; >> - if (!u->memchunk.memblock) { >> - u->memchunk.memblock = pa_memblock_new(u->core->mempool, pa_pipe_buf(u->fd)); >> - u->memchunk.index = u->memchunk.length = 0; >> - } >> + if (ret == 0) >> + goto finish; >> - pa_assert(pa_memblock_get_length(u->memchunk.memblock) > u->memchunk.index); >> + if (rtpoll_item) { >> + pollfd = pa_rtpoll_item_get_pollfd(rtpoll_item, NULL); >> - p = pa_memblock_acquire(u->memchunk.memblock); >> - l = pa_read(u->fd, (uint8_t*) p + u->memchunk.index, pa_memblock_get_length(u->memchunk.memblock) - u->memchunk.index, &read_type); >> - pa_memblock_release(u->memchunk.memblock); >> + if (pollfd->revents & ~(POLLIN | POLLHUP)) { >> + pa_log("FIFO shutdown."); > > You could be more verbose what has happened - see for example > the thread function in module-bluez5-device. Ok. I'll do. > >> + goto fail; >> + } >> + } else >> + pollfd = NULL; >> - pa_assert(l != 0); /* EOF cannot happen, since we opened the fifo for both reading and writing */ >> + /* Try to read some data if there are any events */ >> + if (pollfd && pollfd->revents) { >> + ssize_t l; >> + void *p; >> - if (l < 0) { >> + pa_assert(chunk.index < fs); >> - if (errno == EINTR) >> - continue; >> - else if (errno != EAGAIN) { >> - pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno)); >> - goto fail; >> - } >> + p = pa_memblock_acquire(chunk.memblock); >> + l = pa_read(u->fd, (uint8_t *) p + chunk.index, pa_memblock_get_length(chunk.memblock) - chunk.index, &read_type); >> + pa_memblock_release(chunk.memblock); >> - } else { >> + if (l > 0) { >> + chunk.length = (size_t) l; >> + u->timestamp = pa_rtclock_now(); >> - u->memchunk.length = (size_t) l; >> - pa_source_post(u->source, &u->memchunk); >> - u->memchunk.index += (size_t) l; >> + if (u->corkfd >= 0) { >> + if (u->autosuspend) >> + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_RESUME, u, 0, NULL, NULL); >> - if (u->memchunk.index >= pa_memblock_get_length(u->memchunk.memblock)) { >> - pa_memblock_unref(u->memchunk.memblock); >> - pa_memchunk_reset(&u->memchunk); >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + u->corkfd = -1; >> + } >> + } else if (l == 0) { >> + /* Writer was disconnected: discard unalligned data tail */ >> + chunk.index = 0; >> + >> + if (u->corkfd < 0) { >> + if (u->autosuspend) >> + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), PIPE_SOURCE_SUSPEND, u, 0, NULL, NULL); >> + >> + if ((u->corkfd = pa_open_cloexec(u->filename, O_WRONLY, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> } >> + } else { >> + if (errno == EINTR || errno == EAGAIN) >> + continue; >> - pollfd->revents = 0; >> + pa_log("Failed to read data from FIFO: %s", pa_cstrerror(errno)); >> + goto fail; >> } >> } >> - /* Hmm, nothing to do. Let's sleep */ >> - pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0); >> + /* Post pending data. >> + * Let's keep frame boundaries */ >> + if (PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { >> + size_t total_len = chunk.index + chunk.length; >> + size_t frames = total_len / fs; >> + size_t tail = total_len % fs; >> - if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) >> - goto fail; >> + if (frames > 0) { >> + pa_memblock *memblock; >> - if (ret == 0) >> - goto finish; >> + memblock = pa_memblock_new(u->core->mempool, u->pipe_size); > > Same comment about the memblock size as above. > >> + chunk.length = frames * fs; >> + chunk.index = 0; >> - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> + if (tail) { >> + void *src, *dst; >> - if (pollfd->revents & ~POLLIN) { >> - pa_log("FIFO shutdown."); >> - goto fail; >> + dst = pa_memblock_acquire(memblock); >> + src = pa_memblock_acquire(chunk.memblock); >> + >> + memcpy(dst, (uint8_t *) src + chunk.length, tail); >> + >> + pa_memblock_release(chunk.memblock); >> + pa_memblock_release(memblock); >> + } >> + >> + pa_source_post(u->source, &chunk); >> + pa_memblock_unref(chunk.memblock); >> + chunk.memblock = memblock; >> + } >> + >> + chunk.index = tail; >> + chunk.length = 0; >> } >> } >> @@ -201,6 +333,11 @@ fail: >> pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); >> finish: >> + if (rtpoll_item) >> + pa_rtpoll_item_free(rtpoll_item); >> + >> + pa_memblock_unref(chunk.memblock); >> + >> pa_log_debug("Thread shutting down"); >> } >> @@ -210,7 +347,6 @@ int pa__init(pa_module *m) { >> pa_sample_spec ss; >> pa_channel_map map; >> pa_modargs *ma; >> - struct pollfd *pollfd; >> pa_source_new_data data; >> pa_assert(m); >> @@ -230,7 +366,6 @@ int pa__init(pa_module *m) { >> m->userdata = u = pa_xnew0(struct userdata, 1); >> u->core = m->core; >> u->module = m; >> - pa_memchunk_reset(&u->memchunk); >> u->rtpoll = pa_rtpoll_new(); >> if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) { >> @@ -238,13 +373,31 @@ int pa__init(pa_module *m) { >> goto fail; >> } >> + if (!(u->msg = pa_msgobject_new(pipe_source_msg))) >> + goto fail; >> + >> + u->msg->parent.process_msg = pipe_source_process_msg; >> + >> + u->autosuspend = true; >> + >> + if (pa_modargs_get_value_boolean(ma, "autosuspend", &u->autosuspend) < 0) { >> + pa_log("Failed to parse autosuspend argument."); >> + goto fail; >> + } >> + >> u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); >> if (mkfifo(u->filename, 0666) < 0) { >> pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); >> goto fail; >> } >> - if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + >> + if ((u->corkfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> + >> + if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) { >> pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> goto fail; >> } >> @@ -289,12 +442,10 @@ int pa__init(pa_module *m) { >> pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); >> pa_source_set_rtpoll(u->source, u->rtpoll); >> - pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(pa_pipe_buf(u->fd), &u->source->sample_spec)); >> - u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); >> - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> - pollfd->fd = u->fd; >> - pollfd->events = pollfd->revents = 0; >> + u->pipe_size = pa_pipe_buf(u->fd); >> + u->pipe_size = PA_MIN(pa_mempool_block_size_max(u->core->mempool), u->pipe_size); >> + pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(u->pipe_size, &u->source->sample_spec)); >> if (!(u->thread = pa_thread_new("pipe-source", thread_func, u))) { >> pa_log("Failed to create thread."); >> @@ -346,12 +497,6 @@ void pa__done(pa_module *m) { >> if (u->source) >> pa_source_unref(u->source); >> - if (u->memchunk.memblock) >> - pa_memblock_unref(u->memchunk.memblock); >> - >> - if (u->rtpoll_item) >> - pa_rtpoll_item_free(u->rtpoll_item); >> - >> if (u->rtpoll) >> pa_rtpoll_free(u->rtpoll); >> @@ -360,6 +505,12 @@ void pa__done(pa_module *m) { >> pa_xfree(u->filename); >> } >> + if (u->msg) >> + pa_xfree(u->msg); >> + >> + if (u->corkfd >= 0) >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + >> if (u->fd >= 0) >> pa_assert_se(pa_close(u->fd) == 0); >> > > -- Raman