On 02/20/2018 11:04 PM, Georg Chini wrote: > On 20.02.2018 19:49, Raman Shishniou wrote: >> On 02/20/2018 07:02 PM, Georg Chini wrote: >>> On 20.02.2018 16:38, Raman Shyshniou wrote: >>>> Currently the pipe-source will remain running even if no >>>> writer is connected and therefore no data is produced. >>>> This patch adds the autosuspend=<bool> option to prevent this. >>>> Source will stay suspended if no writer is connected. >>>> This option is enabled by default. >>>> --- >>>> src/modules/module-pipe-source.c | 279 +++++++++++++++++++++++++++++---------- >>>> 1 file changed, 212 insertions(+), 67 deletions(-) >>>> >> I think I need post a simple pseudo code of new thread loop because it >> was completely rewritten. There are too many changes in one patch. >> It can be difficult to see the whole picture of new main loop. > > Well, I applied the patch and looked at the result. I still don't like the approach. > > I would propose this: > > auto_suspended = false; > revents = 0 > events = POLLIN > > for (;;) { > > /* This is the part that is run when the source is opened > * or auto suspended > if (SOURCE_IS_OPENED(source) || auto_suspended) { > > /* Check if we wake up from user suspend */ > if (corkfd >= 0 && !auto_suspended) { > len = 0 > close pipe for writing > } > > /* We received POLLIN or POLLHUP or both */ > if (revents) { > > /* Read data from pipe */ > len = read data > > /* Got data, post it */ > if (len > 0) { > if (auto_suspend) { > send unsuspend message > auto_suspend = false > } > post data > > /* Got POLLHUP or POLLIN without data, writer disconnected */ > } else if (len = 0) { > send suspend message > auto_suspend = true > } > > /* else Handle error */ > else > handle error > > events = POLLIN > } > > /* This is the part that is run, when the source is > * suspended by user */ > } else { > > if (corkfd < 0) > open pipe for writing > events = 0 > } > > /* This is run always */ > run rtpoll > ... check errors ... > } > > I think this is more consistent with how the other thread functions are > written and also has a clearer structure. > Ok. I'll try to do something like this. >> >> pollfd = NULL; >> rtpoll_item = NULL; >> chunk.length = 0; // length of pending data in chunk.memblock >> chunk.memblock = pa_memblock_new(); // always allocated >> >> for (;;) { >> >> /************************************************** >> * run rtpoll >> */ >> if (chunk.length > 0) { >> /* we have a pending data */ >> if (rtpoll_item) { >> /* stop polling pipe >> * the only way to be here: >> * source was just suspended */ >> ... free rtpoll_item ... >> rtpoll_item = NULL; >> } >> } else { >> /* we have no pending data */ >> if (rtpoll_item == NULL) { >> /* start polling pipe >> * the only way to be here: >> * source was just resumed */ >> ... allocate rtpoll_item, get pollfd ... >> pollfd->events = POLLIN; >> pollfd->fd = u->fd; >> } >> } >> >> pa_rtpoll_run() >> >> ... check errors ... >> >> if (rtpoll_item) { >> /* we polled pipe */ >> ... refresh pollfd ... >> } else >> /* we are waiting for source state changes */ >> pollfd = NULL; >> >> /************************************************** >> * Read data >> */ >> if (pollfd && pollfd->revents) { >> >> ... read data from pipe ... >> >> if (len > 0) { >> chunk.length = len; >> >> /* source is autosuspended? */ >> if (u->corkfd >= 0) { >> ... send resume message ... >> close(u->corkfd); >> u->corkfd = -1; >> } >> } >> >> if (len == 0) { >> /* sourece not autosuspended? */ >> if (u->corkfd < 0) { >> ... send suspend message ... >> u->corkfd = open(pipe, O_WRONLY); >> } >> } >> >> if (len < 0) { >> ... check read error ... >> } >> } >> >> /************************************************** >> * Post data >> */ >> if (source is opened) { > > After an auto-unsuspend, the result of the above comparison is undefined. > It may be that the main thread already processed the unsuspend message, > maybe not. The above comparison check thread_info.state. This value changed in i/o thread context, i.e. during pa_rtpoll_run(). It will be PA_SOURCE_SUSPENDED right after we send a message and becomes PA_SOURCE_RUNNING / PA_SOURCE_IDLE after one or two next poll runs. > >> ... post data ... >> unref(chunk.memblock); >> chunk.memblock = pa_memblock_new(); >> chunk.length = 0; >> } >> >> /* chunk.length can be greater than 0 only if we are suspended right now >> * we need to stop polling pipe and wait while state will be changed to RUNNING or IDLE >> * to post pending data */ > > If we are user suspended right now, the data should never be posted. Data that is read > while we are user-suspended should be discarded. Data that arrives while we are > auto-suspended should be posted. > > >> } >> >> I can convert all changes to patch series if you want. >> -- Raman