Hello, > An shm ringbuffer that is used for low overhead server-client communication. > Signalling is done through eventfd semaphores - it's based on pa_fdsem to avoid > syscalls if nothing is waiting on the other side. comment inline > --- > src/Makefile.am | 3 +- > src/pulsecore/srchannel.c | 297 ++++++++++++++++++++++++++++++++++++++++++++++ > src/pulsecore/srchannel.h | 62 ++++++++++ > 3 files changed, 361 insertions(+), 1 deletion(-) > create mode 100644 src/pulsecore/srchannel.c > create mode 100644 src/pulsecore/srchannel.h > > diff --git a/src/Makefile.am b/src/Makefile.am > index 5c2d5bc..9e72982 100644 > --- a/src/Makefile.am > +++ b/src/Makefile.am > @@ -618,6 +618,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \ > pulsecore/creds.h \ > pulsecore/dynarray.c pulsecore/dynarray.h \ > pulsecore/endianmacros.h \ > + pulsecore/fdsem.c pulsecore/fdsem.h \ > pulsecore/flist.c pulsecore/flist.h \ > pulsecore/g711.c pulsecore/g711.h \ > pulsecore/hashmap.c pulsecore/hashmap.h \ > @@ -651,6 +652,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \ > pulsecore/queue.c pulsecore/queue.h \ > pulsecore/random.c pulsecore/random.h \ > pulsecore/refcnt.h \ > + pulsecore/srchannel.c pulsecore/srchannel.h \ > pulsecore/sample-util.c pulsecore/sample-util.h \ > pulsecore/shm.c pulsecore/shm.h \ > pulsecore/bitset.c pulsecore/bitset.h \ > @@ -880,7 +882,6 @@ libpulsecore_ at PA_MAJORMINOR@_la_SOURCES = \ > pulsecore/core-scache.c pulsecore/core-scache.h \ > pulsecore/core-subscribe.c pulsecore/core-subscribe.h \ > pulsecore/core.c pulsecore/core.h \ > - pulsecore/fdsem.c pulsecore/fdsem.h \ > pulsecore/hook-list.c pulsecore/hook-list.h \ > pulsecore/ltdl-helper.c pulsecore/ltdl-helper.h \ > pulsecore/modargs.c pulsecore/modargs.h \ > diff --git a/src/pulsecore/srchannel.c b/src/pulsecore/srchannel.c > new file mode 100644 > index 0000000..15485b2 > --- /dev/null > +++ b/src/pulsecore/srchannel.c > @@ -0,0 +1,297 @@ > +/*** > + This file is part of PulseAudio. > + > + Copyright 2014 David Henningsson, Canonical Ltd. > + > + PulseAudio is free software; you can redistribute it and/or modify > + it under the terms of the GNU Lesser General Public License as > + published by the Free Software Foundation; either version 2.1 of the > + License, or (at your option) any later version. > + > + PulseAudio is distributed in the hope that it will be useful, but > + WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + Lesser General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public > + License along with PulseAudio; if not, write to the Free Software > + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 > + USA. > +***/ > + > +#ifdef HAVE_CONFIG_H > +#include <config.h> > +#endif > + > +#include "srchannel.h" > + > +#include <pulsecore/atomic.h> > +#include <pulse/xmalloc.h> > + > +/* #define DEBUG_SRCHANNEL */ > + > +/* This ringbuffer might be useful in other contexts too, but > + right now it's only used inside the srchannel, so let's keep it here > + for the time being. */ count is atomic, but readindex, writeindex are not the assumption is that there is exactly one reader and exactly one writer? > +typedef struct pa_ringbuffer pa_ringbuffer; > +struct pa_ringbuffer { > + pa_atomic_t *count; > + int capacity; > + uint8_t *memory; > + int readindex, writeindex; > +}; > + > +static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) { > + int c = pa_atomic_load(r->count); > + if (r->readindex + c > r->capacity) > + *count = r->capacity - r->readindex; > + else > + *count = c; > + return r->memory + r->readindex; > +} > + > +/* Returns true only if the buffer was completely full before the drop. */ > +static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) { > + bool b = pa_atomic_sub(r->count, count) >= r->capacity; > + r->readindex += count; > + r->readindex %= r->capacity; > + return b; > +} > + > +static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) { > + int c = pa_atomic_load(r->count); > + *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c); > + return r->memory + r->writeindex; > +} > + > +static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) { > + pa_atomic_add(r->count, count); > + r->writeindex += count; > + r->writeindex %= r->capacity; > +} > + > +struct pa_srchannel { > + pa_ringbuffer rb_read, rb_write; > + pa_fdsem *sem_read, *sem_write; > + pa_memblock *memblock; > + void *cb_userdata; > + pa_srchannel_cb_t callback; > + pa_io_event *read_event; > + pa_mainloop_api *mainloop; > +}; > + > +ssize_t pa_srchannel_write(pa_srchannel *sr, const void *data, size_t l) { > + ssize_t written = 0; > + while (l > 0) { > + int towrite; > + void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite); > + if ((size_t) towrite > l) > + towrite = l; > + if (towrite == 0) { > +#ifdef DEBUG_SRCHANNEL > + pa_log("srchannel output buffer full"); > +#endif > + break; > + } > + memcpy(ptr, data, towrite); > + pa_ringbuffer_end_write(&sr->rb_write, towrite); > + written += towrite; > + data = (uint8_t*) data + towrite; > + l -= towrite; > + } > +#ifdef DEBUG_SRCHANNEL > + pa_log("Wrote %d bytes to srchannel, signalling fdsem", (int) written); > +#endif > + pa_fdsem_post(sr->sem_write); > + return written; > +} > + > +ssize_t pa_srchannel_read(pa_srchannel *sr, void *data, size_t l) { > + ssize_t isread = 0; > + while (l > 0) { > + int toread; > + void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread); > + if ((size_t) toread > l) > + toread = l; > + if (toread == 0) > + break; > + memcpy(data, ptr, toread); > + if (pa_ringbuffer_drop(&sr->rb_read, toread)) { > +#ifdef DEBUG_SRCHANNEL > + pa_log("read from full output buffer, signalling fdsem"); > +#endif > + pa_fdsem_post(sr->sem_write); > + } > + > + isread += toread; > + data = (uint8_t*) data + toread; > + l -= toread; > + } > +#ifdef DEBUG_SRCHANNEL > + pa_log("Read %d bytes from srchannel", (int) isread); > +#endif > + return isread; > +} > + > +/* This is the memory layout of the ringbuffer shm block. It is followed by > + read and write ringbuffer memory. */ > +struct srheader { > + pa_atomic_t read_count; > + pa_atomic_t write_count; > + pa_fdsem_data read_semdata; > + pa_fdsem_data write_semdata; > + int capacity; > + int readbuf_offset; > + int writebuf_offset; > + /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */ > +}; > + > +static void read_loop(pa_srchannel* sr) { > + do { > + int q; > + pa_ringbuffer_peek(&sr->rb_read, &q); > +#ifdef DEBUG_SRCHANNEL > + pa_log("In read loop from srchannel, before callback, count = %d", q); > +#endif > + > + if (q > 0 && sr->callback) > + if (!sr->callback(sr, sr->cb_userdata)) { > +#ifdef DEBUG_SRCHANNEL > + pa_log("Aborting read loop from srchannel"); > +#endif > + return; > + } > + > +#ifdef DEBUG_SRCHANNEL > + pa_ringbuffer_peek(&sr->rb_read, &q); > + pa_log("In read loop from srchannel, after callback, count = %d", q); > +#endif > + > + } while (pa_fdsem_before_poll(sr->sem_read) < 0); > +} > + > +static void read_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { > + pa_srchannel* sr = userdata; > + > + pa_fdsem_after_poll(sr->sem_read); > + read_loop(sr); > +} > + > +pa_srchannel* pa_srchannel_new(pa_mainloop_api *m, pa_mempool *p) { > + int capacity; > + int readfd; > + struct srheader *srh; > + > + pa_srchannel* sr = pa_xmalloc0(sizeof(pa_srchannel)); > + sr->mainloop = m; > + sr->memblock = pa_memblock_new_pool(p, -1); > + srh = pa_memblock_acquire(sr->memblock); > + pa_zero(*srh); > + > + sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh)); > + srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh; > + capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2; > + sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity); > + srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh; > + capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset); > + pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes", > + (int) pa_memblock_get_length(sr->memblock), capacity); > + > + srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity; > + sr->rb_read.count = &srh->read_count; > + sr->rb_write.count = &srh->write_count; > + > + sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata); > + sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata); > + > + readfd = pa_fdsem_get(sr->sem_read); > +#ifdef DEBUG_SRCHANNEL > + pa_log("Enabling io event on fd %d", readfd); > +#endif > + sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, read_cb, sr); > + m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); > + > + return sr; > +} > + > +static void pa_srchannel_swap(pa_srchannel *sr) { > + pa_srchannel temp = *sr; > + sr->sem_read = temp.sem_write; > + sr->sem_write = temp.sem_read; > + sr->rb_read = temp.rb_write; > + sr->rb_write = temp.rb_read; > +} > + > +pa_srchannel* pa_srchannel_new_from_template(pa_mainloop_api *m, pa_srchannel_template *t) > +{ > + int temp; > + struct srheader *srh; > + pa_srchannel* sr = pa_xmalloc0(sizeof(pa_srchannel)); > + > + sr->mainloop = m; > + sr->memblock = t->memblock; > + pa_memblock_ref(sr->memblock); > + srh = pa_memblock_acquire(sr->memblock); > + > + sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity; > + sr->rb_read.count = &srh->read_count; > + sr->rb_write.count = &srh->write_count; > + sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset; > + sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset; > + > + sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd); > + sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd); > + > + pa_srchannel_swap(sr); > + temp = t->readfd; t->readfd = t->writefd; t->writefd = temp; > + > +#ifdef DEBUG_SRCHANNEL > + pa_log("Enabling io event on fd %d", t->readfd); > +#endif > + sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, read_cb, sr); > + m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); > + > + return sr; > +} > + > +void pa_srchannel_export(pa_srchannel *sr, pa_srchannel_template *t) { > + t->memblock = sr->memblock; > + t->readfd = pa_fdsem_get(sr->sem_read); > + t->writefd = pa_fdsem_get(sr->sem_write); > +} > + > +void pa_srchannel_set_callback(pa_srchannel *sr, pa_srchannel_cb_t callback, void *userdata) { > + if (sr->callback) > + pa_fdsem_after_poll(sr->sem_read); > + > + sr->callback = callback; > + sr->cb_userdata = userdata; > + > + if (sr->callback) > + /* Maybe deferred event? */ > + read_loop(sr); > +} > + > +void pa_srchannel_free(pa_srchannel *sr) > +{ > +#ifdef DEBUG_SRCHANNEL > + pa_log("Freeing srchannel"); > +#endif > + if (!sr) > + return; > + > + if (sr->read_event) > + sr->mainloop->io_free(sr->read_event); > + > + if (sr->sem_read) > + pa_fdsem_free(sr->sem_read); > + if (sr->sem_write) > + pa_fdsem_free(sr->sem_write); > + > + if (sr->memblock) { > + pa_memblock_release(sr->memblock); > + pa_memblock_unref(sr->memblock); > + } > + > + pa_xfree(sr); > +} > diff --git a/src/pulsecore/srchannel.h b/src/pulsecore/srchannel.h > new file mode 100644 > index 0000000..8eac77f > --- /dev/null > +++ b/src/pulsecore/srchannel.h > @@ -0,0 +1,62 @@ > +#ifndef foopulsesrchannelhfoo > +#define foopulsesrchannelhfoo > + > +/*** > + This file is part of PulseAudio. > + > + Copyright 2014 David Henningsson, Canonical Ltd. > + > + PulseAudio is free software; you can redistribute it and/or modify > + it under the terms of the GNU Lesser General Public License as > + published by the Free Software Foundation; either version 2.1 of the > + License, or (at your option) any later version. > + > + PulseAudio is distributed in the hope that it will be useful, but > + WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + Lesser General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public > + License along with PulseAudio; if not, write to the Free Software > + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 > + USA. > +***/ > + > +#include <pulse/mainloop-api.h> > +#include <pulsecore/fdsem.h> > +#include <pulsecore/memblock.h> > + > +/* An shm ringbuffer that is used for low overhead server-client communication. > + Signaling is done through eventfd semaphores (pa_fdsem). */ > + > +typedef struct pa_srchannel pa_srchannel; > + > +typedef struct pa_srchannel_template { > + int readfd, writefd; > + pa_memblock *memblock; > +} pa_srchannel_template; > + > +pa_srchannel* pa_srchannel_new(pa_mainloop_api *m, pa_mempool *p); > +/* Note: this creates a srchannel with swapped read and write. */ > +pa_srchannel* pa_srchannel_new_from_template(pa_mainloop_api *m, pa_srchannel_template *t); > + > +void pa_srchannel_free(pa_srchannel *sr); > + > +void pa_srchannel_export(pa_srchannel *sr, pa_srchannel_template *t); > + > +ssize_t pa_srchannel_write(pa_srchannel *sr, const void *data, size_t l); > +ssize_t pa_srchannel_read(pa_srchannel *sr, void *data, size_t l); > + > +/* Set the callback function that is called whenever data becomes available for reading. > + It can also be called if the output buffer was full and can now be written to. > + > + Return false to abort all processing (e g if the srchannel has been freed during the callback). > + Otherwise return true. > + > + Note that if the ringbuffer is not empty when the callback is set up, the callback > + will be called immediately. > +*/ > +typedef bool (*pa_srchannel_cb_t)(pa_srchannel *sr, void *userdata); > +void pa_srchannel_set_callback(pa_srchannel *sr, pa_srchannel_cb_t callback, void *userdata); > + > +#endif > -- Peter Meerwald +43-664-2444418 (mobile)