For writing, we prefer writing through the srchannel if one is available, and we have no ancil data to send. For reading, we support reading from both in parallel. This meant replicating a struct used for reading, so a lot of this patch is just a search/replace in do_read to use the appropriate channel for reading. Signed-off-by: David Henningsson <david.henningsson at canonical.com> --- src/pulsecore/pstream.c | 209 ++++++++++++++++++++++++++++++------------------ src/pulsecore/pstream.h | 5 ++ 2 files changed, 138 insertions(+), 76 deletions(-) diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 539c4a2..42fe187 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -109,12 +109,22 @@ struct item_info { uint32_t block_id; }; +struct pstream_read { + pa_pstream_descriptor descriptor; + pa_memblock *memblock; + pa_packet *packet; + uint32_t shm_info[PA_PSTREAM_SHM_MAX]; + void *data; + size_t index; +}; + struct pa_pstream { PA_REFCNT_DECLARE; pa_mainloop_api *mainloop; pa_defer_event *defer_event; pa_iochannel *io; + pa_srchannel *sr; pa_queue *send_queue; @@ -132,14 +142,7 @@ struct pa_pstream { pa_memchunk memchunk; } write; - struct { - pa_pstream_descriptor descriptor; - pa_memblock *memblock; - pa_packet *packet; - uint32_t shm_info[PA_PSTREAM_SHM_MAX]; - void *data; - size_t index; - } read; + struct pstream_read readio, readsr; bool use_shm; pa_memimport *import; @@ -172,7 +175,7 @@ struct pa_pstream { }; static int do_write(pa_pstream *p); -static int do_read(pa_pstream *p); +static int do_read(pa_pstream *p, struct pstream_read *re); static void do_pstream_read_write(pa_pstream *p) { pa_assert(p); @@ -182,8 +185,13 @@ static void do_pstream_read_write(pa_pstream *p) { p->mainloop->defer_enable(p->defer_event, 0); + if (!p->dead && p->sr) { + do_write(p); + while (!p->dead && do_read(p, &p->readsr) == 0); + } + if (!p->dead && pa_iochannel_is_readable(p->io)) { - if (do_read(p) < 0) + if (do_read(p, &p->readio) < 0) goto fail; } else if (!p->dead && pa_iochannel_is_hungup(p->io)) goto fail; @@ -208,6 +216,17 @@ fail: pa_pstream_unref(p); } +static bool sr_callback(pa_srchannel *sr, void *userdata) { + pa_pstream *p = userdata; + + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p->sr == sr); + + do_pstream_read_write(p); + return p->sr != NULL; +} + static void io_callback(pa_iochannel*io, void *userdata) { pa_pstream *p = userdata; @@ -289,11 +308,17 @@ static void pstream_free(pa_pstream *p) { if (p->write.memchunk.memblock) pa_memblock_unref(p->write.memchunk.memblock); - if (p->read.memblock) - pa_memblock_unref(p->read.memblock); + if (p->readsr.memblock) + pa_memblock_unref(p->readsr.memblock); + + if (p->readsr.packet) + pa_packet_unref(p->readsr.packet); + + if (p->readio.memblock) + pa_memblock_unref(p->readio.memblock); - if (p->read.packet) - pa_packet_unref(p->read.packet); + if (p->readio.packet) + pa_packet_unref(p->readio.packet); pa_xfree(p); } @@ -606,8 +631,9 @@ static int do_write(pa_pstream *p) { p->send_ancil_now = false; } else #endif - - if ((r = pa_iochannel_write(p->io, d, l)) < 0) + if (p->sr) + r = pa_srchannel_write(p->sr, d, l); + else if ((r = pa_iochannel_write(p->io, d, l)) < 0) goto fail; if (release_memblock) @@ -639,7 +665,7 @@ fail: return -1; } -static int do_read(pa_pstream *p) { +static int do_read(pa_pstream *p, struct pstream_read *re) { void *d; size_t l; ssize_t r; @@ -647,23 +673,29 @@ static int do_read(pa_pstream *p) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); - if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) { - d = (uint8_t*) p->read.descriptor + p->read.index; - l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index; + if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) { + d = (uint8_t*) re->descriptor + re->index; + l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index; } else { - pa_assert(p->read.data || p->read.memblock); + pa_assert(re->data || re->memblock); - if (p->read.data) - d = p->read.data; + if (re->data) + d = re->data; else { - d = pa_memblock_acquire(p->read.memblock); - release_memblock = p->read.memblock; + d = pa_memblock_acquire(re->memblock); + release_memblock = re->memblock; } - d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE; - l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE); + d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE; + l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE); } + if (re == &p->readsr) { + r = pa_srchannel_read(p->sr, d, l); + if (r == 0) + return 1; + } + else #ifdef HAVE_CREDS { pa_ancil b; @@ -689,13 +721,13 @@ static int do_read(pa_pstream *p) { if (release_memblock) pa_memblock_release(release_memblock); - p->read.index += (size_t) r; + re->index += (size_t) r; - if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) { + if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) { uint32_t flags, length, channel; /* Reading of frame descriptor complete */ - flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); + flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { pa_log_warn("Received SHM frame on a socket where SHM is disabled."); @@ -706,10 +738,10 @@ static int do_read(pa_pstream *p) { /* This is a SHM memblock release frame with no payload */ -/* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ +/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ pa_assert(p->export); - pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); goto frame_done; @@ -717,24 +749,24 @@ static int do_read(pa_pstream *p) { /* This is a SHM memblock revoke frame with no payload */ -/* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ +/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ pa_assert(p->import); - pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); goto frame_done; } - length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); + length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) { pa_log_warn("Received invalid frame size: %lu", (unsigned long) length); return -1; } - pa_assert(!p->read.packet && !p->read.memblock); + pa_assert(!re->packet && !re->memblock); - channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); + channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); if (channel == (uint32_t) -1) { @@ -744,8 +776,8 @@ static int do_read(pa_pstream *p) { } /* Frame is a packet frame */ - p->read.packet = pa_packet_new(length); - p->read.data = p->read.packet->data; + re->packet = pa_packet_new(length); + re->data = re->packet->data; } else { @@ -756,20 +788,20 @@ static int do_read(pa_pstream *p) { if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { - if (length != sizeof(p->read.shm_info)) { + if (length != sizeof(re->shm_info)) { pa_log_warn("Received SHM memblock frame with invalid frame length."); return -1; } /* Frame is a memblock frame referencing an SHM memblock */ - p->read.data = p->read.shm_info; + re->data = re->shm_info; } else if ((flags & PA_FLAG_SHMMASK) == 0) { /* Frame is a memblock frame */ - p->read.memblock = pa_memblock_new(p->mempool, length); - p->read.data = NULL; + re->memblock = pa_memblock_new(p->mempool, length); + re->data = NULL; } else { pa_log_warn("Received memblock frame with invalid flags value."); @@ -777,74 +809,74 @@ static int do_read(pa_pstream *p) { } } - } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) { + } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) { /* Frame payload available */ - if (p->read.memblock && p->receive_memblock_callback) { + if (re->memblock && p->receive_memblock_callback) { /* Is this memblock data? Than pass it to the user */ - l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r; + l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r; if (l > 0) { pa_memchunk chunk; - chunk.memblock = p->read.memblock; - chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l; + chunk.memblock = re->memblock; + chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l; chunk.length = l; if (p->receive_memblock_callback) { int64_t offset; offset = (int64_t) ( - (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | - (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); p->receive_memblock_callback( p, - ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), offset, - ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, &chunk, p->receive_memblock_callback_userdata); } /* Drop seek info for following callbacks */ - p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = - p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = - p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; + re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = + re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = + re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; } } /* Frame complete */ - if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) { + if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) { - if (p->read.memblock) { + if (re->memblock) { /* This was a memblock frame. We can unref the memblock now */ - pa_memblock_unref(p->read.memblock); + pa_memblock_unref(re->memblock); - } else if (p->read.packet) { + } else if (re->packet) { if (p->receive_packet_callback) #ifdef HAVE_CREDS - p->receive_packet_callback(p, p->read.packet, &p->read_ancil, p->receive_packet_callback_userdata); + p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata); #else - p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata); + p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata); #endif - pa_packet_unref(p->read.packet); + pa_packet_unref(re->packet); } else { pa_memblock *b; - uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); + uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); pa_assert(p->import); if (!(b = pa_memimport_get(p->import, - ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]), - ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]), - ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]), - ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]), + ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), + ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), + ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), + ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), !!(flags & PA_FLAG_SHMWRITABLE)))) { if (pa_log_ratelimit(PA_LOG_DEBUG)) @@ -857,17 +889,17 @@ static int do_read(pa_pstream *p) { chunk.memblock = b; chunk.index = 0; - chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]); + chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]); offset = (int64_t) ( - (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | - (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); p->receive_memblock_callback( p, - ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), offset, - ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, &chunk, p->receive_memblock_callback_userdata); } @@ -883,10 +915,10 @@ static int do_read(pa_pstream *p) { return 0; frame_done: - p->read.memblock = NULL; - p->read.packet = NULL; - p->read.index = 0; - p->read.data = NULL; + re->memblock = NULL; + re->packet = NULL; + re->index = 0; + re->data = NULL; #ifdef HAVE_CREDS p->read_ancil.creds_valid = false; @@ -988,6 +1020,8 @@ void pa_pstream_unlink(pa_pstream *p) { p->dead = true; + pa_pstream_set_srchannel(p, NULL); + if (p->import) { pa_memimport_free(p->import); p->import = NULL; @@ -1040,3 +1074,26 @@ bool pa_pstream_get_shm(pa_pstream *p) { return p->use_shm; } + +void pa_pstream_set_srchannel(pa_pstream *p, pa_srchannel *sr) { + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + + if (sr == p->sr) + return; + + /* Make sure we flush the queue, so we don't write half a package on one channel and half on the other */ + while (!p->dead && (do_write(p) > 0 || p->write.current)) { + pa_log_debug("Flushing commands before %s srchannel...", sr ? "setting up" : "tearing down"); + } + + if (p->sr) { + pa_srchannel_free(p->sr); + p->sr = NULL; + } + + if (sr) { + p->sr = sr; + pa_srchannel_set_callback(sr, sr_callback, p); + } +} diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h index 4961570..7ba4627 100644 --- a/src/pulsecore/pstream.h +++ b/src/pulsecore/pstream.h @@ -31,6 +31,7 @@ #include <pulsecore/packet.h> #include <pulsecore/memblock.h> #include <pulsecore/iochannel.h> +#include <pulsecore/srchannel.h> #include <pulsecore/memchunk.h> #include <pulsecore/creds.h> #include <pulsecore/macro.h> @@ -66,4 +67,8 @@ bool pa_pstream_is_pending(pa_pstream *p); void pa_pstream_enable_shm(pa_pstream *p, bool enable); bool pa_pstream_get_shm(pa_pstream *p); +/* Enables shared ringbuffer channel. Note that the srchannel is now owned by the pstream. + Setting sr to NULL will free any existing srchannel. */ +void pa_pstream_set_srchannel(pa_pstream *p, pa_srchannel *sr); + #endif -- 1.9.1