Sorry for the missing annotation, so here it is: This is a follow up of the thread: [PATCH] "pipe-sink" module timig based upon "null-sink" module mechanics. As proposed, old "pipe-sink" behaviour has been preserved and a new option "use_system_clock_for_timing" defines new behaviour. i.e: ... load-module module-pipe-sink sink_name=pipe_sink format=s16le rate=44100 channels=2 use_system_clock_for_timing=true ... Using this option, even the simplest tools like "cat" can properly dump raw audio from the pipe. Additionally, logging of dropped bytes of audio data was implemented as suggested: "Yeah, it's not good to log every dropped chunk. Maybe log only the first dropped chunk and later the total dropped amount when the pipe starts accepting writes again". I also added "dropped_bytes" statistics cleanup upon every resume from suspended state of the "pipe-sink" module. Afterwards i would like to add "auto-draining" of the pipe, as already propsed upon each resume and maybe also upon full pipe. And lastly additinal option migth be usefull too reuse-existing-pipe. regards, Samo Dne 09.12.2017 (sob) ob 20:09 +0100 je Samo PogaÄ?nik napisal(a): > --- >  src/modules/module-pipe-sink.c | 236 > ++++++++++++++++++++++++++++++++++++++--- >  1 file changed, 221 insertions(+), 15 deletions(-) > > diff --git a/src/modules/module-pipe-sink.c b/src/modules/module- > pipe-sink.c > index 8396a63..e2bf949 100644 > --- a/src/modules/module-pipe-sink.c > +++ b/src/modules/module-pipe-sink.c > @@ -33,7 +33,9 @@ >  #include <sys/filio.h> >  #endif >  > -#include <pulse/xmalloc.h> > +#include <pulse/timeval.h> > +#include <pulse/util.h> > +#include <pulse/rtclock.h> >  >  #include <pulsecore/core-error.h> >  #include <pulsecore/sink.h> > @@ -59,7 +61,9 @@ PA_MODULE_USAGE( >          "format=<sample format> " >          "rate=<sample rate> " >          "channels=<number of channels> " > -        "channel_map=<channel map>"); > +        "channel_map=<channel map> " > +        "use_system_clock_for_timing=<yes or no> " > +); >  >  #define DEFAULT_FILE_NAME "fifo_output" >  #define DEFAULT_SINK_NAME "fifo_output" > @@ -76,12 +80,19 @@ struct userdata { >      char *filename; >      int fd; >      size_t buffer_size; > +    size_t bytes_dropped; > +    bool log_dropped; > +    bool pipe_sink_open; >  >      pa_memchunk memchunk; >  >      pa_rtpoll_item *rtpoll_item; >  >      int write_type; > +    pa_usec_t block_usec; > +    pa_usec_t timestamp; > + > +    bool use_system_clock_for_timing; >  }; >  >  static const char* const valid_modargs[] = { > @@ -92,6 +103,7 @@ static const char* const valid_modargs[] = { >      "rate", >      "channels", >      "channel_map", > +    "use_system_clock_for_timing", >      NULL >  }; >  > @@ -99,27 +111,148 @@ static int sink_process_msg(pa_msgobject *o, > int code, void *data, int64_t offse >      struct userdata *u = PA_SINK(o)->userdata; >  >      switch (code) { > +        case PA_SINK_MESSAGE_SET_STATE: > +            if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || > pa_sink_get_state(u->sink) == PA_SINK_INIT) { > +                if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || > PA_PTR_TO_UINT(data) == PA_SINK_IDLE) > +                    u->timestamp = pa_rtclock_now(); > +            } > +            break; >  > -        case PA_SINK_MESSAGE_GET_LATENCY: { > -            size_t n = 0; > +        case PA_SINK_MESSAGE_GET_LATENCY: > +            if (u->use_system_clock_for_timing) { > +                pa_usec_t now; > +                now = pa_rtclock_now(); > +                *((int64_t*) data) = (int64_t)u->timestamp - > (int64_t)now; > +            } else { > +                size_t n = 0; >  >  #ifdef FIONREAD > -            int l; > +                int l; >  > -            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) > -                n = (size_t) l; > +                if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) > +                    n = (size_t) l; >  #endif >  > -            n += u->memchunk.length; > +                n += u->memchunk.length; >  > -            *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink- > >sample_spec); > +                *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink- > >sample_spec); > +            } >              return 0; > -        } >      } >  >      return pa_sink_process_msg(o, code, data, offset, chunk); >  } >  > +static void sink_update_requested_latency_cb(pa_sink *s) { > +    struct userdata *u; > +    size_t nbytes; > + > +    pa_sink_assert_ref(s); > +    pa_assert_se(u = s->userdata); > + > +    u->block_usec = pa_sink_get_requested_latency_within_thread(s); > + > +    if (u->block_usec == (pa_usec_t) -1) > +        u->block_usec = s->thread_info.max_latency; > + > +    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); > +    pa_sink_set_max_rewind_within_thread(s, nbytes); > +    pa_sink_set_max_request_within_thread(s, nbytes); > +} > + > +static size_t pipe_sink_write(struct userdata *u, void *p) { > +    ssize_t l; > +    size_t index, length; > + > +    pa_assert(u); > + > +    index = u->memchunk.index; > +    length = u->memchunk.length; > + > +    for (;;) { > +        l = pa_write(u->fd, (uint8_t*) p + index, length, &u- > >write_type); > + > +        pa_assert(l != 0); > + > +        if (l < 0) { > +            if (errno == EAGAIN) > +                break; > +            else if (errno != EINTR) { > +                pa_log("Failed to write data to FIFO (%d): %s", > errno, pa_cstrerror(errno)); > +                break; > +            } > +        } else { > +            index += (size_t) l; > +            length -= (size_t) l; > + > +            if (length <= 0) { > +                break; > +            } > +        } > +    } > + > +    return index; > +} > + > +static void process_render_use_timing(struct userdata *u, pa_usec_t > now) { > +    size_t dropped = 0; > +    size_t consumed = 0; > + > +    pa_assert(u); > + > +    if (!u->pipe_sink_open) { > +        /* Clear bytes_dropped statistics. */ > +        pa_log_debug("pipe-sink cleared dropped bytes statistics > (sum: %zu -> 0 dropped bytes)", u->bytes_dropped); > +        u->bytes_dropped = 0; > +        u->log_dropped = true; > +        u->pipe_sink_open = true; > +    } > + > +    /* This is the configured latency. Sink inputs connected to us > +    might not have a single frame more than the maxrequest value > +    queued. Hence: at maximum read this many bytes from the sink > +    inputs. */ > + > +    /* Fill the buffer up the latency size */ > +    while (u->timestamp < now + u->block_usec) { > +        void *p; > +        size_t written = 0; > + > +        pa_sink_render(u->sink, u->sink->thread_info.max_request, > &u->memchunk); > + > +        pa_assert(u->memchunk.length > 0); > + > +        p = pa_memblock_acquire(u->memchunk.memblock); > + > +        written = pipe_sink_write(u, p); > + > +        pa_memblock_release(u->memchunk.memblock); > + > +        pa_memblock_unref(u->memchunk.memblock); > + > +        u->timestamp += pa_bytes_to_usec(u->memchunk.length, &u- > >sink->sample_spec); > + > +        dropped += (u->memchunk.length - written); > +        consumed += u->memchunk.length; > + > +        if (consumed >= u->sink->thread_info.max_request) > +            break; > +    } > + > +    u->bytes_dropped += dropped; > + > +    if (dropped == 0) { > +        if (!u->log_dropped) > +            pa_log_debug("pipe-sink stopped dropping (sum: %zu > dropped bytes)", u->bytes_dropped); > +        u->log_dropped = true; > +    } > + > +    if ((dropped != 0) && u->log_dropped) { > +        pa_log_debug("pipe-sink just dropped %zu bytes (sum: %zu > dropped bytes)", dropped, u->bytes_dropped); > +        u->log_dropped = false; > +    } > +} > + >  static int process_render(struct userdata *u) { >      pa_assert(u); >  > @@ -164,6 +297,56 @@ static int process_render(struct userdata *u) { >      } >  } >  > +static void thread_func_use_timing(void *userdata) { > +    struct userdata *u = userdata; > + > +    pa_assert(u); > + > +    pa_log_debug("Thread (use timing) starting up"); > + > +    pa_thread_mq_install(&u->thread_mq); > + > +    u->timestamp = pa_rtclock_now(); > + > +    for (;;) { > +        pa_usec_t now = 0; > +        int ret; > + > +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) > +            now = pa_rtclock_now(); > + > +        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) > +            pa_sink_process_rewind(u->sink, 0); > + > +        /* Render some data and write it to the fifo */ > +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { > +            if (u->timestamp <= now) > +                process_render_use_timing(u, now); > + > +            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp); > +        } else { > +            pa_rtpoll_set_timer_disabled(u->rtpoll); > +            u->pipe_sink_open = false; > +        } > + > +        /* Hmm, nothing to do. Let's sleep */ > +        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) > +            goto fail; > + > +        if (ret == 0) > +            goto finish; > +    } > + > +fail: > +    /* If this was no regular exit from the loop we have to continue > +     * processing messages until we received PA_MESSAGE_SHUTDOWN */ > +    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), > PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); > +    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); > + > +finish: > +    pa_log_debug("Thread (use timing) shutting down"); > +} > + >  static void thread_func(void *userdata) { >      struct userdata *u = userdata; >  > @@ -227,6 +410,7 @@ int pa__init(pa_module *m) { >      pa_modargs *ma; >      struct pollfd *pollfd; >      pa_sink_new_data data; > +    pa_thread_func_t thread_routine; >  >      pa_assert(m); >  > @@ -249,6 +433,11 @@ int pa__init(pa_module *m) { >      pa_memchunk_reset(&u->memchunk); >      u->rtpoll = pa_rtpoll_new(); >  > +    if (pa_modargs_get_value_boolean(ma, > "use_system_clock_for_timing", &u->use_system_clock_for_timing) < 0) > { > +        pa_log("Failed to parse use_system_clock_for_timing > argument."); > +        goto fail; > +    } > + >      if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u- > >rtpoll) < 0) { >          pa_log("pa_thread_mq_init() failed."); >          goto fail; > @@ -294,7 +483,10 @@ int pa__init(pa_module *m) { >          goto fail; >      } >  > -    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY); > +    if (u->use_system_clock_for_timing) > +        u->sink = pa_sink_new(m->core, &data, > PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY); > +    else > +        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY); >      pa_sink_new_data_done(&data); >  >      if (!u->sink) { > @@ -303,21 +495,34 @@ int pa__init(pa_module *m) { >      } >  >      u->sink->parent.process_msg = sink_process_msg; > +    if (u->use_system_clock_for_timing) > +        u->sink->update_requested_latency = > sink_update_requested_latency_cb; >      u->sink->userdata = u; >  >      pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); >      pa_sink_set_rtpoll(u->sink, u->rtpoll); >  > +    u->bytes_dropped = 0; > +    u->log_dropped = true; > +    u->pipe_sink_open = false; >      u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink- > >sample_spec); > +    if (u->use_system_clock_for_timing) { > +        u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink- > >sample_spec); > +        pa_sink_set_latency_range(u->sink, 0, u->block_usec); > +        pa_sink_set_max_rewind(u->sink, u->buffer_size); > +        thread_routine = thread_func_use_timing; > +    } else { > +        pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u- > >buffer_size, &u->sink->sample_spec)); > +        thread_routine = thread_func; > +    } >      pa_sink_set_max_request(u->sink, u->buffer_size); > -    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u- > >buffer_size, &u->sink->sample_spec)); >  >      u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, > 1); >      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); >      pollfd->fd = u->fd; >      pollfd->events = pollfd->revents = 0; >  > -    if (!(u->thread = pa_thread_new("pipe-sink", thread_func, u))) { > +    if (!(u->thread = pa_thread_new("pipe-sink", thread_routine, > u))) { >          pa_log("Failed to create thread."); >          goto fail; >      } > @@ -367,8 +572,9 @@ void pa__done(pa_module *m) { >      if (u->sink) >          pa_sink_unref(u->sink); >  > -    if (u->memchunk.memblock) > -        pa_memblock_unref(u->memchunk.memblock); > +    if (!u->use_system_clock_for_timing) > +        if (u->memchunk.memblock) > +            pa_memblock_unref(u->memchunk.memblock); >  >      if (u->rtpoll_item) >          pa_rtpoll_item_free(u->rtpoll_item);