Introduce support for marshalling and umarshalling a memfd memblock between different PulseAudio endpoints. Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com> --- src/pulsecore/memblock.c | 220 ++++++++++++++++++++++++++++++++++++++++------ src/pulsecore/memblock.h | 8 +- src/pulsecore/pstream.c | 24 ++--- src/tests/memblock-test.c | 8 +- 4 files changed, 214 insertions(+), 46 deletions(-) diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 6047ead..04caf34 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -97,7 +97,16 @@ struct pa_memblock { struct pa_memimport_segment { pa_memimport *import; - pa_shm memory; + + pa_mem_type_t mem_type; + union { + pa_mem mem; + union { + pa_shm shm; + pa_memfd memfd; + } per_type; + }; + pa_memtrap *trap; unsigned n_blocks; bool writable; @@ -108,7 +117,22 @@ struct pa_memimport { pa_mutex *mutex; pa_mempool *pool; - pa_hashmap *segments; + pa_hashmap *shm_segments; + + /* Unlike what is done with Posix SHM segments above, we cannot track + * memfd-based memimport segments using our file-descriptor ID as key. + * File descriptors are recyclable by nature: the same fd number could + * map different memory regions at different points of time. + * + * Moreover, even if the other endpoint sent us the _very same_ fd + * twice over a unix domain socket, the kernel will pass them to us + * as different fd numbers. + * + * Thus only count the number of memfd segments allocated to this + * memmimport and assure that this counter is zero upon release. + */ + int n_memfd_segments; + pa_hashmap *blocks; /* Called whenever an imported memory block is no longer @@ -537,6 +561,46 @@ pa_mempool* pa_memblock_get_pool(pa_memblock *b) { return b->pool; } +static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) { + switch (b->type) { + case PA_MEMBLOCK_IMPORTED: + pa_assert(b->per_type.imported.segment); + return b->per_type.imported.segment->mem_type; + + case PA_MEMBLOCK_POOL: + case PA_MEMBLOCK_POOL_EXTERNAL: + return b->pool->mem_type; + + case PA_MEMBLOCK_APPENDED: + case PA_MEMBLOCK_FIXED: + case PA_MEMBLOCK_USER: + return PA_MEMORY_PRIVATE; + + default: + pa_assert_not_reached(); + } +}; + +int pa_memblock_get_memfd_fd(pa_memblock *b) { + pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD); + + switch (b->type) { + case PA_MEMBLOCK_IMPORTED: + pa_assert(b->per_type.imported.segment); + return b->per_type.imported.segment->per_type.memfd.fd; + + case PA_MEMBLOCK_POOL: + case PA_MEMBLOCK_POOL_EXTERNAL: + return b->pool->per_type.memfd.fd; + + case PA_MEMBLOCK_APPENDED: + case PA_MEMBLOCK_FIXED: + case PA_MEMBLOCK_USER: + default: + pa_assert_not_reached(); + } +} + /* No lock necessary */ pa_memblock* pa_memblock_ref(pa_memblock*b) { pa_assert(b); @@ -964,7 +1028,8 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void i = pa_xnew(pa_memimport, 1); i->mutex = pa_mutex_new(true, true); i->pool = p; - i->segments = pa_hashmap_new(NULL, NULL); + i->shm_segments = pa_hashmap_new(NULL, NULL); + i->n_memfd_segments = 0; i->blocks = pa_hashmap_new(NULL, NULL); i->release_cb = cb; i->userdata = userdata; @@ -978,25 +1043,59 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i); +static void segment_late_init(pa_memimport_segment *seg, pa_memimport *i, pa_mem_type_t mem_type, bool writable) { + seg->writable = writable; + seg->import = i; + seg->mem_type = mem_type; + + pa_assert(seg->mem.ptr != NULL); + seg->trap = pa_memtrap_add(seg->mem.ptr, seg->mem.size); +} + /* Should be called locked */ -static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) { +static pa_memimport_segment* segment_shm_attach(pa_memimport *i, uint32_t shm_id, bool writable) { pa_memimport_segment* seg; - if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX) + if ((seg = pa_hashmap_get(i->shm_segments, PA_UINT32_TO_PTR(shm_id)))) + return seg; + + if (pa_hashmap_size(i->shm_segments) >= PA_MEMIMPORT_SEGMENTS_MAX) return NULL; seg = pa_xnew0(pa_memimport_segment, 1); - if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) { + if (pa_shm_attach(&seg->per_type.shm, shm_id, writable) < 0) { pa_xfree(seg); return NULL; } - seg->writable = writable; - seg->import = i; - seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size); + segment_late_init(seg, i, PA_MEMORY_SHARED_POSIX, writable); + + pa_hashmap_put(i->shm_segments, PA_UINT32_TO_PTR(seg->per_type.shm.id), seg); + return seg; +} + +/* Should be called locked */ +static pa_memimport_segment* segment_memfd_attach(pa_memimport *i, int fd, bool writable) { + pa_memimport_segment* seg; - pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(seg->memory.id), seg); + /* FIXME: Introduce a proper memfd-tracking mechanism. We receive + * different fd numbers even for the very same fds passed by the + * other endpoint. This can lead to an _unbounded_ increase in + * the number of `pa_memimport_segment' allocations created here. */ +/* if (i->n_memfd_segments >= PA_MEMIMPORT_SEGMENTS_MAX) + return NULL; */ + + seg = pa_xnew0(pa_memimport_segment, 1); + + if (pa_memfd_attach(&seg->per_type.memfd, fd, writable) < 0) { + pa_xfree(seg); + return NULL; + } + + segment_late_init(seg, i, PA_MEMORY_SHARED_MEMFD, writable); + + ++ i->n_memfd_segments; return seg; } @@ -1004,8 +1103,19 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bo static void segment_detach(pa_memimport_segment *seg) { pa_assert(seg); - pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id)); - pa_shm_free(&seg->memory); + switch (seg->mem_type) { + case PA_MEMORY_SHARED_POSIX: + pa_hashmap_remove(seg->import->shm_segments, PA_UINT32_TO_PTR(seg->per_type.shm.id)); + pa_shm_free(&seg->per_type.shm); + break; + case PA_MEMORY_SHARED_MEMFD: + pa_assert(seg->import->n_memfd_segments >= 1); + -- seg->import->n_memfd_segments; + pa_memfd_free(&seg->per_type.memfd); + break; + default: + pa_assert_not_reached(); + } if (seg->trap) pa_memtrap_remove(seg->trap); @@ -1025,7 +1135,8 @@ void pa_memimport_free(pa_memimport *i) { while ((b = pa_hashmap_first(i->blocks))) memblock_replace_import(b); - pa_assert(pa_hashmap_size(i->segments) == 0); + pa_assert(pa_hashmap_size(i->shm_segments) == 0); + pa_assert(i->n_memfd_segments == 0); pa_mutex_unlock(i->mutex); @@ -1040,7 +1151,7 @@ void pa_memimport_free(pa_memimport *i) { pa_mutex_unlock(i->pool->mutex); pa_hashmap_free(i->blocks); - pa_hashmap_free(i->segments); + pa_hashmap_free(i->shm_segments); pa_mutex_free(i->mutex); @@ -1048,10 +1159,10 @@ void pa_memimport_free(pa_memimport *i) { } /* Self-locked */ -pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, - size_t offset, size_t size, bool writable) { +static pa_memblock* pa_memimport_get(pa_memimport *i, pa_mem_type_t type, uint32_t block_id, + uint32_t shm_id, int memfd_fd, size_t offset, size_t size, bool writable) { pa_memblock *b = NULL; - pa_memimport_segment *seg; + pa_memimport_segment *seg = NULL; pa_assert(i); @@ -1065,16 +1176,29 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX) goto finish; - if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id)))) - if (!(seg = segment_attach(i, shm_id, writable))) - goto finish; + switch (type) { + case PA_MEMORY_SHARED_POSIX: + pa_assert(memfd_fd == -1); + seg = segment_shm_attach(i, shm_id, writable); + break; + case PA_MEMORY_SHARED_MEMFD: + pa_assert(shm_id == (uint32_t)-1); + seg = segment_memfd_attach(i, memfd_fd, writable); + break; + case PA_MEMORY_PRIVATE: + default: + pa_assert_not_reached(); + } + + if (!seg) + goto finish; if (writable != seg->writable) { pa_log("Cannot open segment - writable status changed!"); goto finish; } - if (offset+size > seg->memory.size) + if (offset+size > seg->mem.size) goto finish; if (!(b = pa_flist_pop(PA_STATIC_FLIST_GET(unused_memblocks)))) @@ -1085,7 +1209,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i b->type = PA_MEMBLOCK_IMPORTED; b->read_only = !writable; b->is_silence = false; - pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset); + pa_atomic_ptr_store(&b->data, (uint8_t*) seg->mem.ptr + offset); b->length = size; pa_atomic_store(&b->n_acquired, 0); pa_atomic_store(&b->please_signal, 0); @@ -1104,6 +1228,16 @@ finish: return b; } +pa_memblock *pa_memimport_shm_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, + size_t offset, size_t size, bool writable) { + return pa_memimport_get(i, PA_MEMORY_SHARED_POSIX, block_id, shm_id, -1, offset, size, writable); +} + +pa_memblock *pa_memimport_memfd_get(pa_memimport *i, uint32_t block_id, int fd, + size_t offset, size_t size, bool writable) { + return pa_memimport_get(i, PA_MEMORY_SHARED_MEMFD, block_id, -1, fd, offset, size, writable); +} + int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) { pa_memblock *b; int ret = 0; @@ -1260,15 +1394,15 @@ static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) { } /* Self-locked */ -int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) { - pa_shm *memory; +static int memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, pa_mem **memory_ptr, size_t *offset, size_t *size) { struct memexport_slot *slot; + pa_mem *memory; void *data; pa_assert(e); pa_assert(b); pa_assert(block_id); - pa_assert(shm_id); + pa_assert(memory_ptr); pa_assert(offset); pa_assert(size); pa_assert(b->pool == e->pool); @@ -1300,17 +1434,17 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32 if (b->type == PA_MEMBLOCK_IMPORTED) { pa_assert(b->per_type.imported.segment); - memory = &b->per_type.imported.segment->memory; + *memory_ptr = &b->per_type.imported.segment->mem; } else { pa_assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL); pa_assert(b->pool); - memory = &b->pool->per_type.shm; + *memory_ptr = &b->pool->mem; } + memory = *memory_ptr; pa_assert(data >= memory->ptr); pa_assert((uint8_t*) data + b->length <= (uint8_t*) memory->ptr + memory->size); - *shm_id = memory->id; *offset = (size_t) ((uint8_t*) data - (uint8_t*) memory->ptr); *size = b->length; @@ -1321,3 +1455,33 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32 return 0; } + +int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) +{ + pa_shm *shm = NULL; + pa_assert(shm_id); + pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX); + + if (memexport_put(e, b, block_id, (pa_mem **)&shm, offset, size) < 0) + return -1; + + pa_assert(shm); + *shm_id = shm->id; + + return 0; +} + +int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, int *memfd_fd, size_t *offset, size_t * size) +{ + pa_memfd *memfd = NULL; + pa_assert(memfd_fd); + pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD); + + if (memexport_put(e, b, block_id, (pa_mem **)&memfd, offset, size) < 0) + return -1; + + pa_assert(memfd); + *memfd_fd = memfd->fd; + + return 0; +} diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h index 184ba55..fc84c17 100644 --- a/src/pulsecore/memblock.h +++ b/src/pulsecore/memblock.h @@ -118,6 +118,7 @@ void pa_memblock_release(pa_memblock *b); size_t pa_memblock_get_length(pa_memblock *b); pa_mempool * pa_memblock_get_pool(pa_memblock *b); +int pa_memblock_get_memfd_fd(pa_memblock *b); pa_memblock *pa_memblock_will_need(pa_memblock *b); @@ -135,14 +136,17 @@ size_t pa_mempool_block_size_max(pa_mempool *p); /* For receiving blocks from other nodes */ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata); void pa_memimport_free(pa_memimport *i); -pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, +pa_memblock *pa_memimport_shm_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size, bool writable); +pa_memblock *pa_memimport_memfd_get(pa_memimport *i, uint32_t block_id, int fd, size_t offset, + size_t size, bool writable); int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id); /* For sending blocks to other nodes */ pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata); void pa_memexport_free(pa_memexport *e); -int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size); +int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size); +int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, int *memfd_fd, size_t *offset, size_t * size); int pa_memexport_process_release(pa_memexport *e, uint32_t id); #endif diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 8c14fbb..6e1963f 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -539,12 +539,12 @@ static void prepare_next_write_item(pa_pstream *p) { else pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p)); - if (pa_memexport_put(current_export, - p->write.current->chunk.memblock, - &block_id, - &shm_id, - &offset, - &length) >= 0) { + if (pa_memexport_shm_put(current_export, + p->write.current->chunk.memblock, + &block_id, + &shm_id, + &offset, + &length) >= 0) { flags |= PA_FLAG_SHMDATA; if (pa_mempool_is_remote_writable(current_pool)) @@ -880,12 +880,12 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { pa_assert(p->import); - if (!(b = pa_memimport_get(p->import, - ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), - ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), - !!(flags & PA_FLAG_SHMWRITABLE)))) { + if (!(b = pa_memimport_shm_get(p->import, + ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), + ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), + ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), + ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), + !!(flags & PA_FLAG_SHMWRITABLE)))) { if (pa_log_ratelimit(PA_LOG_DEBUG)) pa_log_debug("Failed to import memory block."); diff --git a/src/tests/memblock-test.c b/src/tests/memblock-test.c index e4c9d0a..d1cda87 100644 --- a/src/tests/memblock-test.c +++ b/src/tests/memblock-test.c @@ -122,22 +122,22 @@ START_TEST (memblock_test) { import_c = pa_memimport_new(pool_c, release_cb, (void*) "C"); fail_unless(import_b != NULL); - r = pa_memexport_put(export_a, mb_a, &id, &shm_id, &offset, &size); + r = pa_memexport_shm_put(export_a, mb_a, &id, &shm_id, &offset, &size); fail_unless(r >= 0); fail_unless(shm_id == id_a); pa_log("A: Memory block exported as %u", id); - mb_b = pa_memimport_get(import_b, id, shm_id, offset, size, false); + mb_b = pa_memimport_shm_get(import_b, id, shm_id, offset, size, false); fail_unless(mb_b != NULL); - r = pa_memexport_put(export_b, mb_b, &id, &shm_id, &offset, &size); + r = pa_memexport_shm_put(export_b, mb_b, &id, &shm_id, &offset, &size); fail_unless(r >= 0); fail_unless(shm_id == id_a || shm_id == id_b); pa_memblock_unref(mb_b); pa_log("B: Memory block exported as %u", id); - mb_c = pa_memimport_get(import_c, id, shm_id, offset, size, false); + mb_c = pa_memimport_shm_get(import_c, id, shm_id, offset, size, false); fail_unless(mb_c != NULL); x = pa_memblock_acquire(mb_c); pa_log_debug("1 data=%s", x); -- Darwish http://darwish.chasingpointers.com