On 02/20/2018 11:04 PM, Georg Chini wrote: > On 20.02.2018 19:49, Raman Shishniou wrote: >> On 02/20/2018 07:02 PM, Georg Chini wrote: >>> On 20.02.2018 16:38, Raman Shyshniou wrote: >>>> Currently the pipe-source will remain running even if no >>>> writer is connected and therefore no data is produced. >>>> This patch adds the autosuspend=<bool> option to prevent this. >>>> Source will stay suspended if no writer is connected. >>>> This option is enabled by default. >>>> --- >>>> src/modules/module-pipe-source.c | 279 +++++++++++++++++++++++++++++---------- >>>> 1 file changed, 212 insertions(+), 67 deletions(-) >>>> >> I think I need post a simple pseudo code of new thread loop because it >> was completely rewritten. There are too many changes in one patch. >> It can be difficult to see the whole picture of new main loop. > > Well, I applied the patch and looked at the result. I still don't like the approach. > > I would propose this: > > auto_suspended = false; > revents = 0 > events = POLLIN > > for (;;) { > > /* This is the part that is run when the source is opened > * or auto suspended > if (SOURCE_IS_OPENED(source) || auto_suspended) { > > /* Check if we wake up from user suspend */ > if (corkfd >= 0 && !auto_suspended) { > len = 0 > close pipe for writing > } > > /* We received POLLIN or POLLHUP or both */ > if (revents) { > > /* Read data from pipe */ > len = read data > > /* Got data, post it */ > if (len > 0) { > if (auto_suspend) { > send unsuspend message > auto_suspend = false > } > post data We cannot post data here because source still suspended. Sending resume message is not enough to immediately resume the source. We need to wait several poll runs until it will be resumed. (source->thread_info.state changed in this thread, i.e. during poll run). But we will see POLLIN and/or POLLHUP each run if we don't remove pipe fd from polling. > > /* Got POLLHUP or POLLIN without data, writer disconnected */ > } else if (len = 0) { > send suspend message > auto_suspend = true > } > > /* else Handle error */ > else > handle error > > events = POLLIN > } > > /* This is the part that is run, when the source is > * suspended by user */ > } else { > > if (corkfd < 0) > open pipe for writing > events = 0 > } > > /* This is run always */ > run rtpoll > ... check errors ... > } > > I think this is more consistent with how the other thread functions are > written and also has a clearer structure. > >> >> pollfd = NULL; >> rtpoll_item = NULL; >> chunk.length = 0; // length of pending data in chunk.memblock >> chunk.memblock = pa_memblock_new(); // always allocated >> >> for (;;) { >> >> /************************************************** >> * run rtpoll >> */ >> if (chunk.length > 0) { >> /* we have a pending data */ >> if (rtpoll_item) { >> /* stop polling pipe >> * the only way to be here: >> * source was just suspended */ >> ... free rtpoll_item ... >> rtpoll_item = NULL; >> } >> } else { >> /* we have no pending data */ >> if (rtpoll_item == NULL) { >> /* start polling pipe >> * the only way to be here: >> * source was just resumed */ >> ... allocate rtpoll_item, get pollfd ... >> pollfd->events = POLLIN; >> pollfd->fd = u->fd; >> } >> } >> >> pa_rtpoll_run() >> >> ... check errors ... >> >> if (rtpoll_item) { >> /* we polled pipe */ >> ... refresh pollfd ... >> } else >> /* we are waiting for source state changes */ >> pollfd = NULL; >> >> /************************************************** >> * Read data >> */ >> if (pollfd && pollfd->revents) { >> >> ... read data from pipe ... >> >> if (len > 0) { >> chunk.length = len; >> >> /* source is autosuspended? */ >> if (u->corkfd >= 0) { >> ... send resume message ... >> close(u->corkfd); >> u->corkfd = -1; >> } >> } >> >> if (len == 0) { >> /* sourece not autosuspended? */ >> if (u->corkfd < 0) { >> ... send suspend message ... >> u->corkfd = open(pipe, O_WRONLY); >> } >> } >> >> if (len < 0) { >> ... check read error ... >> } >> } >> >> /************************************************** >> * Post data >> */ >> if (source is opened) { > > After an auto-unsuspend, the result of the above comparison is undefined. > It may be that the main thread already processed the unsuspend message, > maybe not. > >> ... post data ... >> unref(chunk.memblock); >> chunk.memblock = pa_memblock_new(); >> chunk.length = 0; >> } >> >> /* chunk.length can be greater than 0 only if we are suspended right now >> * we need to stop polling pipe and wait while state will be changed to RUNNING or IDLE >> * to post pending data */ > > If we are user suspended right now, the data should never be posted. Data that is read > while we are user-suspended should be discarded. Data that arrives while we are > auto-suspended should be posted. > > >> } >> >> I can convert all changes to patch series if you want. >> >> -- >> Raman > >