On 2016-11-28 05:01, Ahmed S. Darwish wrote: > Current pacat code reads whatever available from STDIN and writes > it directly to the playback stream. A minimal buffer is created > for each read operation; no further reads are then allowed unless > earlier read buffer has been fully consumed by a stream write. > > While quite simple, this model breaks upon the new requirements of > writing only frame-aligned data to the stream (commits #1 and #2). > The kernel read syscall can return a length much smaller than the > frame-aligned size requested, leading to invalid unaligned writes. Sorry, but I don't understand why you need a ringbuffer to resolve this? And now that I think of it, I'm not sure that the pa_xmalloc (used to allocated the ringbuffer's memory) it guaranteed to be aligned either, it just happens to be that way in practice. Wouldn't it be better if we had something like: 1) Call pa_stream_begin_write to get a buffer. 2) If we have half a frame from previous iteration at 4), put that half frame first in the buffer. 3) Call read / pa_read to read data from a file into the rest of the buffer. 4) If the last frame read is not complete, store that half frame in a local variable. 5) Call pa_stream_write with the number of complete frames. 6) Repeat from 1). > > This can easily be reproduced by choosing a starved STDIN backend: > > pacat /dev/random pa_stream_write() failed: EINVAL > echo 1234 | pacat pa_stream_write() failed: EINVAL > > So guard against incomplete kernel reads by using a ringbuffer. > Meanwhile leave the simple recording mode buffering as is: it just > writes to plain STDOUT without any special needs. > > CommitReference #1: 22827a5e1e62 > CommitReference #2: 150ace90f380 > BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=98475 > BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=77595 > Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com> > --- > src/utils/pacat.c | 89 +++++++++++++++++++++++++++++++++++-------------------- > 1 file changed, 57 insertions(+), 32 deletions(-) > > diff --git a/src/utils/pacat.c b/src/utils/pacat.c > index 68362ec..39c002e 100644 > --- a/src/utils/pacat.c > +++ b/src/utils/pacat.c > @@ -38,10 +38,12 @@ > #include <pulse/pulseaudio.h> > #include <pulse/rtclock.h> > > +#include <pulsecore/atomic.h> > #include <pulsecore/core-util.h> > #include <pulsecore/i18n.h> > #include <pulsecore/log.h> > #include <pulsecore/macro.h> > +#include <pulsecore/ringbuffer.h> > #include <pulsecore/sndfile-util.h> > #include <pulsecore/sample-util.h> > > @@ -56,6 +58,11 @@ static pa_context *context = NULL; > static pa_stream *stream = NULL; > static pa_mainloop_api *mainloop_api = NULL; > > +/* Ring buffer for playback mode */ > +static pa_ringbuffer ringbuffer = { 0, }; > +static pa_atomic_t rb_count = PA_ATOMIC_INIT(0); > + > +/* Flat buffer for recording mode */ > static void *buffer = NULL; > static size_t buffer_length = 0, buffer_index = 0; > > @@ -155,29 +162,35 @@ static void start_drain(void) { > > /* Write some data to the stream */ > static void do_stream_write(size_t length) { > - size_t l; > - pa_assert(length); > + int rb_freelen; > + void *buf; > + size_t w; > > - if (!buffer || !buffer_length) > + if (!ringbuffer.memory) > return; > > - l = length; > - if (l > buffer_length) > - l = buffer_length; > + while (length > 0) { > > - if (pa_stream_write(stream, (uint8_t*) buffer + buffer_index, l, NULL, 0, PA_SEEK_RELATIVE) < 0) { > - pa_log(_("pa_stream_write() failed: %s"), pa_strerror(pa_context_errno(context))); > - quit(1); > - return; > - } > + buf = pa_ringbuffer_peek(&ringbuffer, &rb_freelen); > > - buffer_length -= l; > - buffer_index += l; > + /* Frame-align our audio packets or the server will happily ignore > + * them upon arrival. Also only write as much data as requested: do > + * not overflow the stream by emptying the whole ringbuffer. */ > + w = PA_MIN(pa_frame_align(length, &sample_spec), > + pa_frame_align(rb_freelen, &sample_spec)); > + if (!w) > + break; > > - if (!buffer_length) { > - pa_xfree(buffer); > - buffer = NULL; > - buffer_index = buffer_length = 0; > + if (pa_stream_write(stream, buf, w, NULL, 0, PA_SEEK_RELATIVE) < 0) { > + pa_log(_("pa_stream_write() failed: %s"), pa_strerror(pa_context_errno(context))); > + quit(1); > + return; > + } > + > + pa_ringbuffer_drop(&ringbuffer, w); > + > + pa_assert(w <= length); > + length -= w; > } > } > > @@ -192,9 +205,6 @@ static void stream_write_callback(pa_stream *s, size_t length, void *userdata) { > if (stdio_event) > mainloop_api->io_enable(stdio_event, PA_IO_EVENT_INPUT); > > - if (!buffer) > - return; > - > do_stream_write(length); > > } else { > @@ -540,25 +550,42 @@ fail: > > /* New data on STDIN **/ > static void stdin_callback(pa_mainloop_api*a, pa_io_event *e, int fd, pa_io_event_flags_t f, void *userdata) { > - size_t l, w = 0; > - ssize_t r; > - bool stream_not_ready; > + int rb_freelen; > + void *buf; > + size_t writable, r; > > pa_assert(a == mainloop_api); > pa_assert(e); > pa_assert(stdio_event == e); > > - stream_not_ready = !stream || pa_stream_get_state(stream) != PA_STREAM_READY || > - !(l = w = pa_stream_writable_size(stream)); > + /* Stream not ready? */ > + if (!stream || pa_stream_get_state(stream) != PA_STREAM_READY || > + !(writable = pa_stream_writable_size(stream))) { > > - if (buffer || stream_not_ready) { > mainloop_api->io_enable(stdio_event, PA_IO_EVENT_NULL); > return; > } > > - buffer = pa_xmalloc(l); > + /* Ring APIs forces us to use integers: ensure safe casting */ > + writable = PA_MIN(writable, (size_t)INT_MAX); > + > + if (!ringbuffer.memory) { > + size_t rb_size = PA_MAX(writable, 4096 * pa_frame_size(&sample_spec)); > + ringbuffer.memory = pa_xmalloc(rb_size); > + ringbuffer.capacity = rb_size; > + ringbuffer.count = &rb_count; > + } > + > + buf = pa_ringbuffer_begin_write(&ringbuffer, &rb_freelen); > + > + if (!rb_freelen) { > + do_stream_write(writable); > + return; > + } > > - if ((r = pa_read(fd, buffer, l, userdata)) <= 0) { > + /* Avoid underruns due to a too-long read(): read only what the > + * stream requested and don't try to fill the whole ringbuffer. */ > + if ((r = pa_read(fd, buf, PA_MIN((int) writable, rb_freelen), userdata)) <= 0) { > if (r == 0) { > if (verbose) > pa_log(_("Got EOF.")); > @@ -575,11 +602,9 @@ static void stdin_callback(pa_mainloop_api*a, pa_io_event *e, int fd, pa_io_even > return; > } > > - buffer_length = (uint32_t) r; > - buffer_index = 0; > + pa_ringbuffer_end_write(&ringbuffer, r); > > - if (w) > - do_stream_write(w); > + do_stream_write(writable); > } > > /* Some data may be written to STDOUT */ >