The shared ringbuffer memblock must be writable by both sides. This makes it possible to send such a memblock over a pstream without the "both sides writable" information getting lost. Signed-off-by: David Henningsson <david.henningsson at canonical.com> --- src/pulsecore/memblock.c | 40 +++++++++++++++++++++++++++++++++++----- src/pulsecore/memblock.h | 6 +++++- src/pulsecore/pstream.c | 32 ++++++++++++++++++++++---------- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 8da0fcd..5ef2aa9 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -97,6 +97,7 @@ struct pa_memimport_segment { pa_shm memory; pa_memtrap *trap; unsigned n_blocks; + bool writable; }; /* A collection of multiple segments */ @@ -146,6 +147,7 @@ struct pa_mempool { pa_shm memory; size_t block_size; unsigned n_blocks; + bool is_remote_writable; pa_atomic_t n_init; @@ -303,6 +305,19 @@ static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) { } /* No lock necessary */ +bool pa_mempool_is_remote_writable(pa_mempool *p) { + pa_assert(p); + return p->is_remote_writable; +} + +/* No lock necessary */ +void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable) { + pa_assert(p); + pa_assert(!writable || pa_mempool_is_shared(p)); + p->is_remote_writable = writable; +} + +/* No lock necessary */ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) { pa_memblock *b = NULL; struct mempool_slot *slot; @@ -416,6 +431,14 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, pa_free } /* No lock necessary */ +bool pa_memblock_is_ours(pa_memblock *b) { + pa_assert(b); + pa_assert(PA_REFCNT_VALUE(b) > 0); + + return b->type != PA_MEMBLOCK_IMPORTED; +} + +/* No lock necessary */ bool pa_memblock_is_read_only(pa_memblock *b) { pa_assert(b); pa_assert(PA_REFCNT_VALUE(b) > 0); @@ -905,7 +928,7 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i); /* Should be called locked */ -static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) { +static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) { pa_memimport_segment* seg; if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX) @@ -913,11 +936,12 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) { seg = pa_xnew0(pa_memimport_segment, 1); - if (pa_shm_attach(&seg->memory, shm_id, false) < 0) { + if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) { pa_xfree(seg); return NULL; } + seg->writable = writable; seg->import = i; seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size); @@ -973,7 +997,8 @@ void pa_memimport_free(pa_memimport *i) { } /* Self-locked */ -pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) { +pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, + size_t offset, size_t size, bool writable) { pa_memblock *b = NULL; pa_memimport_segment *seg; @@ -990,9 +1015,14 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i goto finish; if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id)))) - if (!(seg = segment_attach(i, shm_id))) + if (!(seg = segment_attach(i, shm_id, writable))) goto finish; + if (writable != seg->writable) { + pa_log("Cannot open segment - writable status changed!"); + goto finish; + } + if (offset+size > seg->memory.size) goto finish; @@ -1002,7 +1032,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i PA_REFCNT_INIT(b); b->pool = i->pool; b->type = PA_MEMBLOCK_IMPORTED; - b->read_only = true; + b->read_only = !writable; b->is_silence = false; pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset); b->length = size; diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h index 502f207..d60f3c3 100644 --- a/src/pulsecore/memblock.h +++ b/src/pulsecore/memblock.h @@ -104,6 +104,7 @@ function is not multiple caller safe, i.e. needs to be locked manually if called from more than one thread at the same time. */ void pa_memblock_unref_fixed(pa_memblock*b); +bool pa_memblock_is_ours(pa_memblock *b); bool pa_memblock_is_read_only(pa_memblock *b); bool pa_memblock_is_silence(pa_memblock *b); bool pa_memblock_ref_is_one(pa_memblock *b); @@ -125,12 +126,15 @@ const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p); void pa_mempool_vacuum(pa_mempool *p); int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id); bool pa_mempool_is_shared(pa_mempool *p); +bool pa_mempool_is_remote_writable(pa_mempool *p); +void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable); size_t pa_mempool_block_size_max(pa_mempool *p); /* For receiving blocks from other nodes */ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata); void pa_memimport_free(pa_memimport *i); -pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size); +pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, + size_t offset, size_t size, bool writable); int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id); /* For sending blocks to other nodes */ diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 22ea250..539c4a2 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -45,11 +45,12 @@ #include "pstream.h" /* We piggyback information if audio data blocks are stored in SHM on the seek mode */ -#define PA_FLAG_SHMDATA 0x80000000LU -#define PA_FLAG_SHMRELEASE 0x40000000LU -#define PA_FLAG_SHMREVOKE 0xC0000000LU -#define PA_FLAG_SHMMASK 0xFF000000LU -#define PA_FLAG_SEEKMASK 0x000000FFLU +#define PA_FLAG_SHMDATA 0x80000000LU +#define PA_FLAG_SHMRELEASE 0x40000000LU +#define PA_FLAG_SHMREVOKE 0xC0000000LU +#define PA_FLAG_SHMMASK 0xFF000000LU +#define PA_FLAG_SEEKMASK 0x000000FFLU +#define PA_FLAG_SHMWRITABLE 0x00800000LU /* The sequence descriptor header consists of 5 32bit integers: */ enum { @@ -504,10 +505,15 @@ static void prepare_next_write_item(pa_pstream *p) { size_t offset, length; uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE]; size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX; + pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock); + pa_memexport *current_export; - pa_assert(p->export); + if (p->mempool == current_pool) + pa_assert_se(current_export = p->export); + else + pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p)); - if (pa_memexport_put(p->export, + if (pa_memexport_put(current_export, p->write.current->chunk.memblock, &block_id, &shm_id, @@ -515,6 +521,8 @@ static void prepare_next_write_item(pa_pstream *p) { &length) >= 0) { flags |= PA_FLAG_SHMDATA; + if (pa_mempool_is_remote_writable(current_pool)) + flags |= PA_FLAG_SHMWRITABLE; send_payload = false; shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); @@ -527,6 +535,9 @@ static void prepare_next_write_item(pa_pstream *p) { } /* else */ /* pa_log_warn("Failed to export memory block."); */ + + if (current_export != p->export) + pa_memexport_free(current_export); } if (send_payload) { @@ -824,8 +835,8 @@ static int do_read(pa_pstream *p) { pa_packet_unref(p->read.packet); } else { pa_memblock *b; - - pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); + uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); + pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); pa_assert(p->import); @@ -833,7 +844,8 @@ static int do_read(pa_pstream *p) { ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]), ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]), ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]), - ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) { + ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]), + !!(flags & PA_FLAG_SHMWRITABLE)))) { if (pa_log_ratelimit(PA_LOG_DEBUG)) pa_log_debug("Failed to import memory block."); -- 1.9.1