Added option auto_drain_pipe_on_resume to enable draining any remaining data from the pipe upon every pipe-sink resume out of suspend. When a pipe reader fails, the pipe sink fills up the pipe and starts dropping instead of writing new data. Old data remains in the pipe to be consumed by the eventually recovered or replaced reader. By each new drop a gap between the pipe content and new data to be written grows. If the sink suspends while dropping, resuming from suspend is going to clear the pipe and start writing new data into an empty pipe, thus removing the gap (old, potentially irrelevant data). The change asures that every pipe-sink resume from suspend makes its first write into an empty pipe. --- src/modules/module-pipe-sink.c | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 995785e..10ce882 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -62,6 +62,7 @@ PA_MODULE_USAGE( "channels=<number of channels> " "channel_map=<channel map> " "use_system_clock_for_timing=<yes or no> " + "auto_drain_pipe_on_resume=<yes or no> " ); #define DEFAULT_FILE_NAME "fifo_output" @@ -82,16 +83,19 @@ struct userdata { size_t buffer_size; size_t bytes_dropped; bool fifo_error; + uint8_t *drain_buffer; pa_memchunk memchunk; pa_rtpoll_item *rtpoll_item; + int read_type; int write_type; pa_usec_t block_usec; pa_usec_t timestamp; bool use_system_clock_for_timing; + bool auto_drain_pipe_on_resume; }; static const char* const valid_modargs[] = { @@ -103,17 +107,39 @@ static const char* const valid_modargs[] = { "channels", "channel_map", "use_system_clock_for_timing", + "auto_drain_pipe_on_resume", NULL }; +static ssize_t drain_pipe_sink(struct userdata *u) { + ssize_t l, drained = 0; + + pa_assert(u); + + do { + l = pa_read(u->fd, u->drain_buffer, u->buffer_size, &u->read_type); + + if (l > 0) + drained += l; + + } while ((l > 0) || (errno == EINTR)); + + return drained; +} + static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; switch (code) { case PA_SINK_MESSAGE_SET_STATE: if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { - if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data))) + if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data))) { u->timestamp = pa_rtclock_now(); + if (u->auto_drain_pipe_on_resume) { + ssize_t d = drain_pipe_sink(u); + pa_log_debug("Pipe-sink resume from suspend: auto-drained %zd bytes from the pipe", d); + } + } } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) { if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) { /* Clear potential FIFO error flag */ @@ -443,11 +469,17 @@ int pa__init(pa_module *m) { goto fail; } + if (pa_modargs_get_value_boolean(ma, "auto_drain_pipe_on_resume", &u->auto_drain_pipe_on_resume) < 0) { + pa_log("Failed to parse auto_drain_pipe_on_resume argument."); + goto fail; + } + if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) { pa_log("pa_thread_mq_init() failed."); goto fail; } + u->read_type = 0; u->write_type = 0; u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); @@ -525,6 +557,9 @@ int pa__init(pa_module *m) { } pa_sink_set_max_request(u->sink, u->buffer_size); + if (u->auto_drain_pipe_on_resume) + u->drain_buffer = (uint8_t *)pa_xmalloc(u->buffer_size); + u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); pollfd->fd = u->fd; -- 2.7.4