On 02/22/2018 10:18 PM, Georg Chini wrote: >> - /* Hmm, nothing to do. Let's sleep */ >> - pollfd->events = (short) (u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0); >> + /* Post data to source, discard data or wait for state transition to be complete */ >> + if (PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { >> + >> + if (u->writer_connected && u->corkfd >= 0) { >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + u->corkfd = -1; >> + } >> + >> + if (u->memchunk.length > 0) { >> + pa_source_post(u->source, &u->memchunk); >> + pa_memblock_unref(u->memchunk.memblock); >> + u->memchunk.memblock = pa_memblock_new(u->core->mempool, u->pipe_size); >> + u->memchunk.length = 0; >> + } >> + >> + } else if (u->suspended_by_user) > > You have to open the corkfd here as well to avoid POLLHUP spam if the writer > disconnects during user suspend. > During user suspend we are doing normal polling. If the writer will be disconnected we'll see it by reading 0 bytes and corkfd will be opened. >> + u->memchunk.length = 0; >> + >> + >> + pollfd->events = u->memchunk.length ? 0 : POLLIN; > > How do you wake up from autosuspend? When the writer disconnects, > memchunk.length will be 0 and so you do no longer listen for POLLIN. > I guess you need to know the current suspend cause here: > user suspended -> events = 0 > (only auto suspended + no writer connected) or running -> events = POLLIN > only auto suspended + writer connected (=wake up transition) -> events = 0 > Yes. This is what I do, keep data in memchunk and wait while source will be resumed. - Source autosuspended, corkfd opened, rtpoll wait for POLLIN - Someone connected and send us data. - pa_rtpoll_run() returned and pollfd->revents = POLLIN First iteration: - pollfd->revents == POLLIN - l = read() - l > 0: - memchunk.length = l - writer_connected is false - send PIPE_SOURCE_RESUME message - set writer_connected = true, ! do not close corkfd here to avoid POLLHUP spam ! - Data cannot be posted because source still suspended, i.e. we just send resume message to main thread and thread_info.state not changed yet. - Data cannot be discarded - source auto suspended, not suspended by user. - So we need to wait for source to be resumed (and keep data in memchunk): - events = 0 - pa_rtpoll_run(): corkfd still opened, but events = 0 - we are waiting for our resume message will be processed. Next iteration - resume message was processed - pollfd->events = 0 (pellfb->events was set to 0 in previous iteration) - Source was resumed: - writer_connected is true and corkfd still opened: -close corkfd - post data to source - allocate new buffer - memchunk.length = 0 - We are processed pending data (memchunk.length == 0) - set events = POLLIN - pa_rtpoll_run(): corkfd closed, events = POLLIN - we are waiting for new data >> if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) >> goto fail; >> @@ -188,8 +281,13 @@ static void thread_func(void *userdata) { >> pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> - if (pollfd->revents & ~POLLIN) { >> - pa_log("FIFO shutdown."); >> + if (pollfd->revents & ~(POLLIN|POLLHUP)) { >> + pa_log("FD error: %s%s%s%s", >> + pollfd->revents & POLLOUT ? "POLLOUT " : "", >> + pollfd->revents & POLLERR ? "POLLERR " : "", >> + pollfd->revents & POLLPRI ? "POLLPRI " : "", >> + pollfd->revents & POLLNVAL ? "POLLNVAL " : ""); >> + >> goto fail; >> } >> } >> @@ -238,13 +336,26 @@ int pa__init(pa_module *m) { >> goto fail; >> } >> + if (!(u->msg = pa_msgobject_new(pipe_source_msg))) >> + goto fail; >> + >> + u->msg->parent.process_msg = pipe_source_process_msg; >> + >> u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); >> if (mkfifo(u->filename, 0666) < 0) { >> pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); >> goto fail; >> } >> - if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + >> + if ((u->corkfd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { >> + pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> + goto fail; >> + } >> + >> + u->writer_connected = true; >> + >> + if ((u->fd = pa_open_cloexec(u->filename, O_RDONLY, 0)) < 0) { >> pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); >> goto fail; >> } >> @@ -289,13 +400,18 @@ int pa__init(pa_module *m) { >> pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); >> pa_source_set_rtpoll(u->source, u->rtpoll); >> - pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(pa_pipe_buf(u->fd), &u->source->sample_spec)); >> + >> + u->pipe_size = pa_pipe_buf(u->fd); >> + u->pipe_size = PA_MIN(pa_mempool_block_size_max(m->core->mempool), u->pipe_size); >> + pa_source_set_fixed_latency(u->source, pa_bytes_to_usec(u->pipe_size, &u->source->sample_spec)); >> u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); >> pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >> pollfd->fd = u->fd; >> pollfd->events = pollfd->revents = 0; >> + u->suspended_by_user = false; >> + >> if (!(u->thread = pa_thread_new("pipe-source", thread_func, u))) { >> pa_log("Failed to create thread."); >> goto fail; >> @@ -360,6 +476,12 @@ void pa__done(pa_module *m) { >> pa_xfree(u->filename); >> } >> + if (u->msg) >> + pa_xfree(u->msg); >> + >> + if (u->corkfd >= 0) >> + pa_assert_se(pa_close(u->corkfd) == 0); >> + >> if (u->fd >= 0) >> pa_assert_se(pa_close(u->fd) == 0); >> > > I believe you need some user_unsuspended and/or auto_unsuspended > flag that is set in the SET_STATE handler to handle the first data read > after suspend correctly. > > When you unsuspend from auto suspend, the first chunk you already read > from the pipe should be posted - as far as I can see this happens. > > However, when you unsuspend from user suspend, the data in the pipe may > be old - you did not read anything during suspend. Therefore the first chunk > you read should be discarded. That does not happen as far as I can tell. Or > am I overlooking something (again)? > > The general approach of the patch looks OK now. > -- Raman