An shm ringbuffer that is used for low overhead server-client communication. Signalling is done through eventfd semaphores - it's based on pa_fdsem to avoid syscalls if nothing is waiting on the other side. Signed-off-by: David Henningsson <david.henningsson at canonical.com> --- src/Makefile.am | 3 +- src/pulsecore/srbchannel.c | 305 +++++++++++++++++++++++++++++++++++++++++++++ src/pulsecore/srbchannel.h | 62 +++++++++ 3 files changed, 369 insertions(+), 1 deletion(-) create mode 100644 src/pulsecore/srbchannel.c create mode 100644 src/pulsecore/srbchannel.h diff --git a/src/Makefile.am b/src/Makefile.am index 1ac8a16..b9a660e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -618,6 +618,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \ pulsecore/creds.h \ pulsecore/dynarray.c pulsecore/dynarray.h \ pulsecore/endianmacros.h \ + pulsecore/fdsem.c pulsecore/fdsem.h \ pulsecore/flist.c pulsecore/flist.h \ pulsecore/g711.c pulsecore/g711.h \ pulsecore/hashmap.c pulsecore/hashmap.h \ @@ -651,6 +652,7 @@ libpulsecommon_ at PA_MAJORMINOR@_la_SOURCES = \ pulsecore/queue.c pulsecore/queue.h \ pulsecore/random.c pulsecore/random.h \ pulsecore/refcnt.h \ + pulsecore/srbchannel.c pulsecore/srbchannel.h \ pulsecore/sample-util.c pulsecore/sample-util.h \ pulsecore/shm.c pulsecore/shm.h \ pulsecore/bitset.c pulsecore/bitset.h \ @@ -880,7 +882,6 @@ libpulsecore_ at PA_MAJORMINOR@_la_SOURCES = \ pulsecore/core-scache.c pulsecore/core-scache.h \ pulsecore/core-subscribe.c pulsecore/core-subscribe.h \ pulsecore/core.c pulsecore/core.h \ - pulsecore/fdsem.c pulsecore/fdsem.h \ pulsecore/hook-list.c pulsecore/hook-list.h \ pulsecore/ltdl-helper.c pulsecore/ltdl-helper.h \ pulsecore/modargs.c pulsecore/modargs.h \ diff --git a/src/pulsecore/srbchannel.c b/src/pulsecore/srbchannel.c new file mode 100644 index 0000000..5fe2220 --- /dev/null +++ b/src/pulsecore/srbchannel.c @@ -0,0 +1,305 @@ +/*** + This file is part of PulseAudio. + + Copyright 2014 David Henningsson, Canonical Ltd. + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include "srbchannel.h" + +#include <pulsecore/atomic.h> +#include <pulse/xmalloc.h> + +/* #define DEBUG_SRBCHANNEL */ + +/* This ringbuffer might be useful in other contexts too, but + right now it's only used inside the srbchannel, so let's keep it here + for the time being. */ +typedef struct pa_ringbuffer pa_ringbuffer; +struct pa_ringbuffer { + pa_atomic_t *count; + int capacity; + uint8_t *memory; + int readindex, writeindex; +}; + +static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) { + int c = pa_atomic_load(r->count); + if (r->readindex + c > r->capacity) + *count = r->capacity - r->readindex; + else + *count = c; + return r->memory + r->readindex; +} + +/* Returns true only if the buffer was completely full before the drop. */ +static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) { + bool b = pa_atomic_sub(r->count, count) >= r->capacity; + r->readindex += count; + r->readindex %= r->capacity; + return b; +} + +static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) { + int c = pa_atomic_load(r->count); + *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c); + return r->memory + r->writeindex; +} + +static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) { + pa_atomic_add(r->count, count); + r->writeindex += count; + r->writeindex %= r->capacity; +} + +struct pa_srbchannel { + pa_ringbuffer rb_read, rb_write; + pa_fdsem *sem_read, *sem_write; + pa_memblock *memblock; + void *cb_userdata; + pa_srbchannel_cb_t callback; + pa_io_event *read_event; + pa_mainloop_api *mainloop; +}; + +/* We always listen to sem_read, and always signal on sem_write. + + This means we signal the same semaphore for two scenarios: + 1) We have written something to our send buffer, and want the other + side to read it + 2) We have read something from our receive buffer that was previously + completely full, and want the other side to continue writing +*/ + +size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) { + size_t written = 0; + while (l > 0) { + int towrite; + void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite); + if ((size_t) towrite > l) + towrite = l; + if (towrite == 0) { +#ifdef DEBUG_SRBCHANNEL + pa_log("srbchannel output buffer full"); +#endif + break; + } + memcpy(ptr, data, towrite); + pa_ringbuffer_end_write(&sr->rb_write, towrite); + written += towrite; + data = (uint8_t*) data + towrite; + l -= towrite; + } +#ifdef DEBUG_SRBCHANNEL + pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written); +#endif + pa_fdsem_post(sr->sem_write); + return written; +} + +size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) { + size_t isread = 0; + while (l > 0) { + int toread; + void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread); + if ((size_t) toread > l) + toread = l; + if (toread == 0) + break; + memcpy(data, ptr, toread); + if (pa_ringbuffer_drop(&sr->rb_read, toread)) { +#ifdef DEBUG_SRBCHANNEL + pa_log("read from full output buffer, signalling fdsem"); +#endif + pa_fdsem_post(sr->sem_write); + } + + isread += toread; + data = (uint8_t*) data + toread; + l -= toread; + } +#ifdef DEBUG_SRBCHANNEL + pa_log("Read %d bytes from srbchannel", (int) isread); +#endif + return isread; +} + +/* This is the memory layout of the ringbuffer shm block. It is followed by + read and write ringbuffer memory. */ +struct srbheader { + pa_atomic_t read_count; + pa_atomic_t write_count; + pa_fdsem_data read_semdata; + pa_fdsem_data write_semdata; + int capacity; + int readbuf_offset; + int writebuf_offset; + /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */ +}; + +static void srbchannel_rwloop(pa_srbchannel* sr) { + do { +#ifdef DEBUG_SRBCHANNEL + int q; + pa_ringbuffer_peek(&sr->rb_read, &q); + pa_log("In rw loop from srbchannel, before callback, count = %d", q); +#endif + + if (sr->callback) + if (!sr->callback(sr, sr->cb_userdata)) { +#ifdef DEBUG_SRBCHANNEL + pa_log("Aborting read loop from srbchannel"); +#endif + return; + } + +#ifdef DEBUG_SRBCHANNEL + pa_ringbuffer_peek(&sr->rb_read, &q); + pa_log("In rw loop from srbchannel, after callback, count = %d", q); +#endif + + } while (pa_fdsem_before_poll(sr->sem_read) < 0); +} + +static void semread_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { + pa_srbchannel* sr = userdata; + + pa_fdsem_after_poll(sr->sem_read); + srbchannel_rwloop(sr); +} + +pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) { + int capacity; + int readfd; + struct srbheader *srh; + + pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel)); + sr->mainloop = m; + sr->memblock = pa_memblock_new_pool(p, -1); + srh = pa_memblock_acquire(sr->memblock); + pa_zero(*srh); + + sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh)); + srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh; + capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2; + sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity); + srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh; + capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset); + pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes", + (int) pa_memblock_get_length(sr->memblock), capacity); + + srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity; + sr->rb_read.count = &srh->read_count; + sr->rb_write.count = &srh->write_count; + + sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata); + sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata); + + readfd = pa_fdsem_get(sr->sem_read); +#ifdef DEBUG_SRBCHANNEL + pa_log("Enabling io event on fd %d", readfd); +#endif + sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr); + m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); + + return sr; +} + +static void pa_srbchannel_swap(pa_srbchannel *sr) { + pa_srbchannel temp = *sr; + sr->sem_read = temp.sem_write; + sr->sem_write = temp.sem_read; + sr->rb_read = temp.rb_write; + sr->rb_write = temp.rb_read; +} + +pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t) +{ + int temp; + struct srbheader *srh; + pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel)); + + sr->mainloop = m; + sr->memblock = t->memblock; + pa_memblock_ref(sr->memblock); + srh = pa_memblock_acquire(sr->memblock); + + sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity; + sr->rb_read.count = &srh->read_count; + sr->rb_write.count = &srh->write_count; + sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset; + sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset; + + sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd); + sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd); + + pa_srbchannel_swap(sr); + temp = t->readfd; t->readfd = t->writefd; t->writefd = temp; + +#ifdef DEBUG_SRBCHANNEL + pa_log("Enabling io event on fd %d", t->readfd); +#endif + sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr); + m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); + + return sr; +} + +void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t) { + t->memblock = sr->memblock; + t->readfd = pa_fdsem_get(sr->sem_read); + t->writefd = pa_fdsem_get(sr->sem_write); +} + +void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) { + if (sr->callback) + pa_fdsem_after_poll(sr->sem_read); + + sr->callback = callback; + sr->cb_userdata = userdata; + + if (sr->callback) + /* Maybe deferred event? */ + srbchannel_rwloop(sr); +} + +void pa_srbchannel_free(pa_srbchannel *sr) +{ +#ifdef DEBUG_SRBCHANNEL + pa_log("Freeing srbchannel"); +#endif + pa_assert(sr); + + if (sr->read_event) + sr->mainloop->io_free(sr->read_event); + + if (sr->sem_read) + pa_fdsem_free(sr->sem_read); + if (sr->sem_write) + pa_fdsem_free(sr->sem_write); + + if (sr->memblock) { + pa_memblock_release(sr->memblock); + pa_memblock_unref(sr->memblock); + } + + pa_xfree(sr); +} diff --git a/src/pulsecore/srbchannel.h b/src/pulsecore/srbchannel.h new file mode 100644 index 0000000..843bf96 --- /dev/null +++ b/src/pulsecore/srbchannel.h @@ -0,0 +1,62 @@ +#ifndef foopulsesrbchannelhfoo +#define foopulsesrbchannelhfoo + +/*** + This file is part of PulseAudio. + + Copyright 2014 David Henningsson, Canonical Ltd. + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#include <pulse/mainloop-api.h> +#include <pulsecore/fdsem.h> +#include <pulsecore/memblock.h> + +/* An shm ringbuffer that is used for low overhead server-client communication. + Signaling is done through eventfd semaphores (pa_fdsem). */ + +typedef struct pa_srbchannel pa_srbchannel; + +typedef struct pa_srbchannel_template { + int readfd, writefd; + pa_memblock *memblock; +} pa_srbchannel_template; + +pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p); +/* Note: this creates a srbchannel with swapped read and write. */ +pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t); + +void pa_srbchannel_free(pa_srbchannel *sr); + +void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t); + +size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l); +size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l); + +/* Set the callback function that is called whenever data becomes available for reading. + It can also be called if the output buffer was full and can now be written to. + + Return false to abort all processing (e g if the srbchannel has been freed during the callback). + Otherwise return true. + + Note that the callback will be called immediately, to be able to process stuff that + might already be in the buffer. +*/ +typedef bool (*pa_srbchannel_cb_t)(pa_srbchannel *sr, void *userdata); +void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata); + +#endif -- 1.9.1