Thank you for the remarks. I tried to correct all issues and below is a new patch. Dne 12.12.2017 (tor) ob 02:04 +0200 je Tanu Kaskinen napisal(a): > On Sat, 2017-12-09 at 20:09 +0100, Samo PogaÄ?nik wrote: > > > > --- > >  src/modules/module-pipe-sink.c | 236 > > ++++++++++++++++++++++++++++++++++++++--- > >  1 file changed, 221 insertions(+), 15 deletions(-) > > > > diff --git a/src/modules/module-pipe-sink.c b/src/modules/module- > > pipe-sink.c > > index 8396a63..e2bf949 100644 > > --- a/src/modules/module-pipe-sink.c > > +++ b/src/modules/module-pipe-sink.c > >  } > >  > > +static void sink_update_requested_latency_cb(pa_sink *s) { > > +    struct userdata *u; > > +    size_t nbytes; > > + > > +    pa_sink_assert_ref(s); > > +    pa_assert_se(u = s->userdata); > > + > > +    u->block_usec = > > pa_sink_get_requested_latency_within_thread(s); > > + > > +    if (u->block_usec == (pa_usec_t) -1) > > +        u->block_usec = s->thread_info.max_latency; > > + > > +    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); > > +    pa_sink_set_max_rewind_within_thread(s, nbytes); > The sink doesn't support rewinding, so you shouldn't set max_rewind. Removed line. > > > > > +    pa_sink_set_max_request_within_thread(s, nbytes); > > +} > > + > > +static size_t pipe_sink_write(struct userdata *u, void *p) { > p comes from u->memchunk, so it seems weird to me to pass it as a > parameter when it's available via the userdata. You probably did this > way because you call pa_memblock_acquire() before calling this > function, but I'd prefer doing the pa_memblock_acquire() and > pa_memblock_release() calls in this function. > > I would also like to use a stack-allocated pa_memchunk instead of > u->memchunk, because using u->memchunk gives the impression that > u->memchunk carries some state that needs to be preserved, while in > reality u->memchunk is reset every time process_render_use_timing() > is > called and it's not used outside process_render_use_timing() or > pipe_sink_write(). Done. > > > > > +    ssize_t l; > > +    size_t index, length; > > + > > +    pa_assert(u); > > + > > +    index = u->memchunk.index; > > +    length = u->memchunk.length; > > + > > +    for (;;) { > > +        l = pa_write(u->fd, (uint8_t*) p + index, length, &u- > > >write_type); > > + > > +        pa_assert(l != 0); > > + > > +        if (l < 0) { > > +            if (errno == EAGAIN) > > +                break; > > +            else if (errno != EINTR) { > > +                pa_log("Failed to write data to FIFO (%d): %s", > > errno, pa_cstrerror(errno)); > > +                break; > The caller is not notified about the error, so if there's some > problem > with the FIFO, we'll keep try writing to it and the log will be > spammed > with error messages. > > The module should at least stop writing until it's suspended and > started again, or maybe it should even unload itself. > Done module unload on error. Error gets notified via negative number of potentially already written bytes increased by one. > > > > +            } > > +        } else { > > +            index += (size_t) l; > > +            length -= (size_t) l; > > + > > +            if (length <= 0) { > > +                break; > > +            } > > +        } > > +    } > > + > > +    return index; > You seem to assume that u->memchunk.index is always zero. The > memchunk > returned by pa_sink_render() may contain any index. > Fixed. > > > > +} > > + > > +static void process_render_use_timing(struct userdata *u, > > pa_usec_t now) { > > +    size_t dropped = 0; > > +    size_t consumed = 0; > > + > > +    pa_assert(u); > > + > > +    if (!u->pipe_sink_open) { > > +        /* Clear bytes_dropped statistics. */ > > +        pa_log_debug("pipe-sink cleared dropped bytes statistics > > (sum: %zu -> 0 dropped bytes)", u->bytes_dropped); > > +        u->bytes_dropped = 0; > > +        u->log_dropped = true; > > +        u->pipe_sink_open = true; > I think this would fit better in the PA_SINK_MESSAGE_SET_STATE > handler. > u->pipe_sink_open won't be needed then. > Moved code as suggested and removed "log_dropped" and "pipe_sink_open" params. > > > > +    } > > + > > +    /* This is the configured latency. Sink inputs connected to us > > +    might not have a single frame more than the maxrequest value > > +    queued. Hence: at maximum read this many bytes from the sink > > +    inputs. */ > > + > > +    /* Fill the buffer up the latency size */ > > +    while (u->timestamp < now + u->block_usec) { > > +        void *p; > > +        size_t written = 0; > > + > > +        pa_sink_render(u->sink, u->sink->thread_info.max_request, > > &u->memchunk); > > + > > +        pa_assert(u->memchunk.length > 0); > > + > > +        p = pa_memblock_acquire(u->memchunk.memblock); > > + > > +        written = pipe_sink_write(u, p); > > + > > +        pa_memblock_release(u->memchunk.memblock); > > + > > +        pa_memblock_unref(u->memchunk.memblock); > > + > > +        u->timestamp += pa_bytes_to_usec(u->memchunk.length, &u- > > >sink->sample_spec); > > + > > +        dropped += (u->memchunk.length - written); > > +        consumed += u->memchunk.length; > > + > > +        if (consumed >= u->sink->thread_info.max_request) > > +            break; > > +    } > > + > > +    u->bytes_dropped += dropped; > > + > > +    if (dropped == 0) { > > +        if (!u->log_dropped) > > +            pa_log_debug("pipe-sink stopped dropping (sum: %zu > > dropped bytes)", u->bytes_dropped); > > +        u->log_dropped = true; > > +    } > > + > > +    if ((dropped != 0) && u->log_dropped) { > > +        pa_log_debug("pipe-sink just dropped %zu bytes (sum: %zu > > dropped bytes)", dropped, u->bytes_dropped); > > +        u->log_dropped = false; > > +    } > I think this logging code belongs inside the loop where > pipe_sink_write() is called. Consider the (unlikely) case where two > subsequent pipe_sink_write() calls write the audio partially: in that > case there are two separate drops, but your current code treats them > as > one continuous drop. > > You seem to aggregate all drops in u->bytes_dropped for the duration > when the sink is open, but I think it would be more useful to count > only the bytes of one continuos drop. Now it's not known how long the > individual holes in the stream are. > Changed as suggested. User structure only stores number of continuously dropped bytes until each dropping break and until entering suspend while continuously dropping. > > > > +} > > + > >  static int process_render(struct userdata *u) { > >      pa_assert(u); > >  > >  > >      if (!u->sink) { > > @@ -303,21 +495,34 @@ int pa__init(pa_module *m) { > >      } > >  > >      u->sink->parent.process_msg = sink_process_msg; > > +    if (u->use_system_clock_for_timing) > > +        u->sink->update_requested_latency = > > sink_update_requested_latency_cb; > >      u->sink->userdata = u; > >  > >      pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); > >      pa_sink_set_rtpoll(u->sink, u->rtpoll); > >  > > +    u->bytes_dropped = 0; > > +    u->log_dropped = true; > > +    u->pipe_sink_open = false; > >      u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink- > > >sample_spec); > > +    if (u->use_system_clock_for_timing) { > > +        u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink- > > >sample_spec); > > +        pa_sink_set_latency_range(u->sink, 0, u->block_usec); > > +        pa_sink_set_max_rewind(u->sink, u->buffer_size); > max_rewind shouldn't be set. > Removed line. Patch: ------ diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 64ef807..d955583 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -34,6 +34,9 @@  #endif   #include <pulse/xmalloc.h> +#include <pulse/timeval.h> +#include <pulse/util.h> +#include <pulse/rtclock.h>   #include <pulsecore/core-error.h>  #include <pulsecore/sink.h> @@ -57,7 +60,9 @@ PA_MODULE_USAGE(          "format=<sample format> "          "rate=<sample rate> "          "channels=<number of channels> " -        "channel_map=<channel map>"); +        "channel_map=<channel map> " +        "use_system_clock_for_timing=<yes or no> " +);   #define DEFAULT_FILE_NAME "fifo_output"  #define DEFAULT_SINK_NAME "fifo_output" @@ -74,12 +79,17 @@ struct userdata {      char *filename;      int fd;      size_t buffer_size; +    size_t bytes_dropped;       pa_memchunk memchunk;       pa_rtpoll_item *rtpoll_item;       int write_type; +    pa_usec_t block_usec; +    pa_usec_t timestamp; + +    bool use_system_clock_for_timing;  };   static const char* const valid_modargs[] = { @@ -90,6 +100,7 @@ static const char* const valid_modargs[] = {      "rate",      "channels",      "channel_map", +    "use_system_clock_for_timing",      NULL  };  @@ -97,27 +108,159 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse      struct userdata *u = PA_SINK(o)->userdata;       switch (code) { +        case PA_SINK_MESSAGE_SET_STATE: +            if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) { +                if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || PA_PTR_TO_UINT(data) == PA_SINK_IDLE) +                    u->timestamp = pa_rtclock_now(); +            } else if (pa_sink_get_state(u->sink) == PA_SINK_RUNNING || pa_sink_get_state(u->sink) == PA_SINK_IDLE) { +                if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) { +                    /* Continuously dropping data (clear counter on entering suspended state. */ +                    if (u->bytes_dropped != 0) { +                        pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped); +                        u->bytes_dropped = 0; +                    } +                } +            } +            break;  -        case PA_SINK_MESSAGE_GET_LATENCY: { -            size_t n = 0; +        case PA_SINK_MESSAGE_GET_LATENCY: +            if (u->use_system_clock_for_timing) { +                pa_usec_t now; +                now = pa_rtclock_now(); +                *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now; +            } else { +                size_t n = 0;   #ifdef FIONREAD -            int l; +                int l;  -            if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) -                n = (size_t) l; +                if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) +                    n = (size_t) l;  #endif  -            n += u->memchunk.length; +                n += u->memchunk.length;  -            *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec); +                *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec); +            }              return 0; -        }      }       return pa_sink_process_msg(o, code, data, offset, chunk);  }  +static void sink_update_requested_latency_cb(pa_sink *s) { +    struct userdata *u; +    size_t nbytes; + +    pa_sink_assert_ref(s); +    pa_assert_se(u = s->userdata); + +    u->block_usec = pa_sink_get_requested_latency_within_thread(s); + +    if (u->block_usec == (pa_usec_t) -1) +        u->block_usec = s->thread_info.max_latency; + +    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); +    pa_sink_set_max_request_within_thread(s, nbytes); +} + +static ssize_t pipe_sink_write(struct userdata *u, pa_memchunk *pchunk) { +    size_t index, length; +    ssize_t count = 0; +    void *p; + +    pa_assert(u); +    pa_assert(pchunk); + +    index = pchunk->index; +    length = pchunk->length; +    p = pa_memblock_acquire(pchunk->memblock); + +    for (;;) { +        ssize_t l; + +        l = pa_write(u->fd, (uint8_t*) p + index, length, &u->write_type); + +        pa_assert(l != 0); + +        if (l < 0) { +            if (errno == EAGAIN) +                break; +            else if (errno != EINTR) { +                pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); +                count = l - count; +                break; +            } +        } else { +            count += l; +            index += l; +            length -= l; + +            if (length <= 0) { +                break; +            } +        } +    } + +    pa_memblock_release(pchunk->memblock); + +    return count; +} + +static int process_render_use_timing(struct userdata *u, pa_usec_t now) { +    int ret = 0; +    size_t dropped = 0; +    size_t consumed = 0; + +    pa_assert(u); + +    /* This is the configured latency. Sink inputs connected to us +    might not have a single frame more than the maxrequest value +    queued. Hence: at maximum read this many bytes from the sink +    inputs. */ + +    /* Fill the buffer up the latency size */ +    while (u->timestamp < now + u->block_usec) { +        ssize_t written = 0; +        pa_memchunk chunk; + +        pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk); + +        pa_assert(chunk.length > 0); + +        if ((written = pipe_sink_write(u, &chunk)) < 0) { +            written = -1 - written; +            ret = -1; +        } + +        pa_memblock_unref(chunk.memblock); + +        u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec); + +        dropped = chunk.length - written; + +        if (dropped != 0) { +            if (u->bytes_dropped == 0) { +                pa_log_debug("Pipe-sink just dropped %zu bytes", dropped); +                u->bytes_dropped = dropped; +            } else +                u->bytes_dropped += dropped; +        } else { +            if (u->bytes_dropped != 0) { +                pa_log_debug("Pipe-sink continuously dropped %zu bytes", u->bytes_dropped); +                u->bytes_dropped = 0; +            } +        } + +        consumed += chunk.length; + +        if (consumed >= u->sink->thread_info.max_request) +            break; +    } + +    return ret; +} +  static int process_render(struct userdata *u) {      pa_assert(u);  @@ -162,6 +305,55 @@ static int process_render(struct userdata *u) {      }  }  +static void thread_func_use_timing(void *userdata) { +    struct userdata *u = userdata; + +    pa_assert(u); + +    pa_log_debug("Thread (use timing) starting up"); + +    pa_thread_mq_install(&u->thread_mq); + +    u->timestamp = pa_rtclock_now(); + +    for (;;) { +        pa_usec_t now = 0; +        int ret; + +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) +            now = pa_rtclock_now(); + +        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) +            pa_sink_process_rewind(u->sink, 0); + +        /* Render some data and write it to the fifo */ +        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { +            if (u->timestamp <= now) +                if (process_render_use_timing(u, now) < 0) +                    goto fail; + +            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp); +        } else +            pa_rtpoll_set_timer_disabled(u->rtpoll); + +        /* Hmm, nothing to do. Let's sleep */ +        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) +            goto fail; + +        if (ret == 0) +            goto finish; +    } + +fail: +    /* If this was no regular exit from the loop we have to continue +     * processing messages until we received PA_MESSAGE_SHUTDOWN */ +    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); +    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); + +finish: +    pa_log_debug("Thread (use timing) shutting down"); +} +  static void thread_func(void *userdata) {      struct userdata *u = userdata;  @@ -225,6 +417,7 @@ int pa__init(pa_module *m) {      pa_modargs *ma;      struct pollfd *pollfd;      pa_sink_new_data data; +    pa_thread_func_t thread_routine;       pa_assert(m);  @@ -247,6 +440,11 @@ int pa__init(pa_module *m) {      pa_memchunk_reset(&u->memchunk);      u->rtpoll = pa_rtpoll_new();  +    if (pa_modargs_get_value_boolean(ma, "use_system_clock_for_timing", &u->use_system_clock_for_timing) < 0) { +        pa_log("Failed to parse use_system_clock_for_timing argument."); +        goto fail; +    } +      if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {          pa_log("pa_thread_mq_init() failed.");          goto fail; @@ -292,7 +490,10 @@ int pa__init(pa_module *m) {          goto fail;      }  -    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY); +    if (u->use_system_clock_for_timing) +        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY); +    else +        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);      pa_sink_new_data_done(&data);       if (!u->sink) { @@ -301,21 +502,31 @@ int pa__init(pa_module *m) {      }       u->sink->parent.process_msg = sink_process_msg; +    if (u->use_system_clock_for_timing) +        u->sink->update_requested_latency = sink_update_requested_latency_cb;      u->sink->userdata = u;       pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);      pa_sink_set_rtpoll(u->sink, u->rtpoll);  +    u->bytes_dropped = 0;      u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink->sample_spec); +    if (u->use_system_clock_for_timing) { +        u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec); +        pa_sink_set_latency_range(u->sink, 0, u->block_usec); +        thread_routine = thread_func_use_timing; +    } else { +        pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec)); +        thread_routine = thread_func; +    }      pa_sink_set_max_request(u->sink, u->buffer_size); -    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec));       u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);      pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);      pollfd->fd = u->fd;      pollfd->events = pollfd->revents = 0;  -    if (!(u->thread = pa_thread_new("pipe-sink", thread_func, u))) { +    if (!(u->thread = pa_thread_new("pipe-sink", thread_routine, u))) {          pa_log("Failed to create thread.");          goto fail;      } -- regards, Samo