On 20.02.2018 19:49, Raman Shishniou wrote: > On 02/20/2018 07:02 PM, Georg Chini wrote: >> On 20.02.2018 16:38, Raman Shyshniou wrote: >>> Currently the pipe-source will remain running even if no >>> writer is connected and therefore no data is produced. >>> This patch adds the autosuspend=<bool> option to prevent this. >>> Source will stay suspended if no writer is connected. >>> This option is enabled by default. >>> --- >>> src/modules/module-pipe-source.c | 279 +++++++++++++++++++++++++++++---------- >>> 1 file changed, 212 insertions(+), 67 deletions(-) >>> > I think I need post a simple pseudo code of new thread loop because it > was completely rewritten. There are too many changes in one patch. > It can be difficult to see the whole picture of new main loop. Well, I applied the patch and looked at the result. I still don't like the approach. I would propose this: auto_suspended = false; revents = 0 events = POLLIN for (;;) {      /* This is the part that is run when the source is opened       * or auto suspended      if (SOURCE_IS_OPENED(source) || auto_suspended) {          /* Check if we wake up from user suspend */          if (corkfd >= 0 && !auto_suspended) {               len = 0               close pipe for writing          }        /* We received POLLIN or POLLHUP or both */        if (revents) {            /* Read data from pipe */            len = read data             /* Got data, post it */             if (len > 0) {               if (auto_suspend) {                     send unsuspend message                     auto_suspend = false                }                post data             /* Got POLLHUP or POLLIN without data, writer disconnected */             } else if (len = 0) {                send suspend message                 auto_suspend = true             }             /* else Handle error */             else               handle error             events = POLLIN         }      /* This is the part that is run, when the source is      * suspended by user */      } else {           if (corkfd < 0)              open pipe for writing           events = 0     }      /* This is run always */      run rtpoll      ... check errors ... } I think this is more consistent with how the other thread functions are written and also has a clearer structure. > > pollfd = NULL; > rtpoll_item = NULL; > chunk.length = 0; // length of pending data in chunk.memblock > chunk.memblock = pa_memblock_new(); // always allocated > > for (;;) { > > /************************************************** > * run rtpoll > */ > if (chunk.length > 0) { > /* we have a pending data */ > if (rtpoll_item) { > /* stop polling pipe > * the only way to be here: > * source was just suspended */ > ... free rtpoll_item ... > rtpoll_item = NULL; > } > } else { > /* we have no pending data */ > if (rtpoll_item == NULL) { > /* start polling pipe > * the only way to be here: > * source was just resumed */ > ... allocate rtpoll_item, get pollfd ... > pollfd->events = POLLIN; > pollfd->fd = u->fd; > } > } > > pa_rtpoll_run() > > ... check errors ... > > if (rtpoll_item) { > /* we polled pipe */ > ... refresh pollfd ... > } else > /* we are waiting for source state changes */ > pollfd = NULL; > > /************************************************** > * Read data > */ > if (pollfd && pollfd->revents) { > > ... read data from pipe ... > > if (len > 0) { > chunk.length = len; > > /* source is autosuspended? */ > if (u->corkfd >= 0) { > ... send resume message ... > close(u->corkfd); > u->corkfd = -1; > } > } > > if (len == 0) { > /* sourece not autosuspended? */ > if (u->corkfd < 0) { > ... send suspend message ... > u->corkfd = open(pipe, O_WRONLY); > } > } > > if (len < 0) { > ... check read error ... > } > } > > /************************************************** > * Post data > */ > if (source is opened) { After an auto-unsuspend, the result of the above comparison is undefined. It may be that the main thread already processed the unsuspend message, maybe not. > ... post data ... > unref(chunk.memblock); > chunk.memblock = pa_memblock_new(); > chunk.length = 0; > } > > /* chunk.length can be greater than 0 only if we are suspended right now > * we need to stop polling pipe and wait while state will be changed to RUNNING or IDLE > * to post pending data */ If we are user suspended right now, the data should never be posted. Data that is read while we are user-suspended should be discarded. Data that arrives while we are auto-suspended should be posted. > } > > I can convert all changes to patch series if you want. > > -- > Raman