Now that we have the necessary infrastructure for memexporting and mempimporting a memfd memblock, extend that support higher up in the chain with pstreams. A PulseAudio endpoint can now transparently send a memfd memblock to the other end by simply calling pa_pstream_send_memblock(). Some DRY refactorings are needed, but they will be done in their own commits due to the complexity of the pstreams code. Signed-off-by: Ahmed S. Darwish <darwish.07 at gmail.com> --- src/pulsecore/memblock.c | 8 +- src/pulsecore/memblock.h | 1 + src/pulsecore/pstream.c | 195 +++++++++++++++++++++++++++++++++++++---------- 3 files changed, 158 insertions(+), 46 deletions(-) diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 04caf34..5f0dcbe 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -561,7 +561,7 @@ pa_mempool* pa_memblock_get_pool(pa_memblock *b) { return b->pool; } -static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) { +pa_mem_type_t pa_memblock_get_mem_type(pa_memblock *b) { switch (b->type) { case PA_MEMBLOCK_IMPORTED: pa_assert(b->per_type.imported.segment); @@ -582,7 +582,7 @@ static pa_mem_type_t memblock_get_mem_type(pa_memblock *b) { }; int pa_memblock_get_memfd_fd(pa_memblock *b) { - pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD); + pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD); switch (b->type) { case PA_MEMBLOCK_IMPORTED: @@ -1460,7 +1460,7 @@ int pa_memexport_shm_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, ui { pa_shm *shm = NULL; pa_assert(shm_id); - pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX); + pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_POSIX); if (memexport_put(e, b, block_id, (pa_mem **)&shm, offset, size) < 0) return -1; @@ -1475,7 +1475,7 @@ int pa_memexport_memfd_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, { pa_memfd *memfd = NULL; pa_assert(memfd_fd); - pa_assert(memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD); + pa_assert(pa_memblock_get_mem_type(b) == PA_MEMORY_SHARED_MEMFD); if (memexport_put(e, b, block_id, (pa_mem **)&memfd, offset, size) < 0) return -1; diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h index fc84c17..359669e 100644 --- a/src/pulsecore/memblock.h +++ b/src/pulsecore/memblock.h @@ -118,6 +118,7 @@ void pa_memblock_release(pa_memblock *b); size_t pa_memblock_get_length(pa_memblock *b); pa_mempool * pa_memblock_get_pool(pa_memblock *b); +pa_mem_type_t pa_memblock_get_mem_type(pa_memblock *b); int pa_memblock_get_memfd_fd(pa_memblock *b); pa_memblock *pa_memblock_will_need(pa_memblock *b); diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 6e1963f..a65b0a6 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -46,6 +46,7 @@ #define PA_FLAG_SHMDATA 0x80000000LU #define PA_FLAG_SHMRELEASE 0x40000000LU #define PA_FLAG_SHMREVOKE 0xC0000000LU +#define PA_FLAG_MEMFD_FD 0x01000000LU #define PA_FLAG_SHMMASK 0xFF000000LU #define PA_FLAG_SEEKMASK 0x000000FFLU #define PA_FLAG_SHMWRITABLE 0x00800000LU @@ -69,6 +70,14 @@ enum { PA_PSTREAM_SHM_MAX }; +/* If we have a memfd block, this info follows the descriptor */ +enum { + PA_PSTREAM_MEMFD_BLOCKID, + PA_PSTREAM_MEMFD_INDEX, + PA_PSTREAM_MEMFD_LENGTH, + PA_PSTREAM_MEMFD_MAX +}; + typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX]; #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t)) @@ -85,7 +94,9 @@ PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree); struct item_info { enum { PA_PSTREAM_ITEM_PACKET, - PA_PSTREAM_ITEM_MEMBLOCK, + PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX, + PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD, + PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE, PA_PSTREAM_ITEM_SHMRELEASE, PA_PSTREAM_ITEM_SHMREVOKE } type; @@ -111,7 +122,10 @@ struct pstream_read { pa_pstream_descriptor descriptor; pa_memblock *memblock; pa_packet *packet; - uint32_t shm_info[PA_PSTREAM_SHM_MAX]; + struct { + uint32_t shm_info[PA_PSTREAM_SHM_MAX]; + uint32_t memfd_info[PA_PSTREAM_MEMFD_MAX]; + } per_type; void *data; size_t index; }; @@ -278,11 +292,17 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo return p; } +static bool item_type_is_memblock(struct item_info *i) { + return i->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX || + i->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD || + i->type == PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE; +} + static void item_free(void *item) { struct item_info *i = item; pa_assert(i); - if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) { + if (item_type_is_memblock(i)) { pa_assert(i->chunk.memblock); pa_memblock_unref(i->chunk.memblock); } else if (i->type == PA_PSTREAM_ITEM_PACKET) { @@ -376,7 +396,6 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items)))) i = pa_xnew(struct item_info, 1); - i->type = PA_PSTREAM_ITEM_MEMBLOCK; n = PA_MIN(length, bsm); i->chunk.index = chunk->index + idx; @@ -390,6 +409,27 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa i->with_ancil_data = false; #endif + switch (pa_memblock_get_mem_type(chunk->memblock)) { + case PA_MEMORY_SHARED_POSIX: + i->type = PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX; + break; + + case PA_MEMORY_SHARED_MEMFD: +#ifdef HAVE_CREDS + i->type = PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD; +#else + pa_assert_not_reached(); +#endif + break; + + case PA_MEMORY_PRIVATE: + i->type = PA_PSTREAM_ITEM_MEMBLOCK_PRIVATE; + break; + + default: + pa_assert_not_reached(); + } + pa_queue_push(p->send_queue, i); idx += n; @@ -516,8 +556,9 @@ static void prepare_next_write_item(pa_pstream *p) { } else { uint32_t flags; bool send_payload = true; + int memexport_result = -1; - pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK); + pa_assert(item_type_is_memblock(p->write.current)); pa_assert(p->write.current->chunk.memblock); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel); @@ -527,10 +568,6 @@ static void prepare_next_write_item(pa_pstream *p) { flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK); if (p->use_shm) { - uint32_t block_id, shm_id; - size_t offset, length; - uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE]; - size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX; pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock); pa_memexport *current_export; @@ -539,28 +576,70 @@ static void prepare_next_write_item(pa_pstream *p) { else pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p)); - if (pa_memexport_shm_put(current_export, - p->write.current->chunk.memblock, - &block_id, - &shm_id, - &offset, - &length) >= 0) { + if (p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_POSIX) { + uint32_t block_id, shm_id; + size_t offset, length; + uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE]; + size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX; + + memexport_result = pa_memexport_shm_put(current_export, + p->write.current->chunk.memblock, + &block_id, + &shm_id, + &offset, + &length); + + if (memexport_result >= 0) { + flags |= PA_FLAG_SHMDATA; + + shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); + shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); + shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); + shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size); + p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size; + } + + } else if (p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK_SHARED_MEMFD) { + uint32_t block_id; + int memfd_fd = -1; + size_t offset, length; + uint32_t *memfd_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE]; + size_t memfd_size = sizeof(uint32_t) * PA_PSTREAM_MEMFD_MAX; + + memexport_result = pa_memexport_memfd_put(current_export, + p->write.current->chunk.memblock, + &block_id, + &memfd_fd, + &offset, + &length); + + if (memexport_result >= 0) { + flags |= PA_FLAG_MEMFD_FD; + + memfd_info[PA_PSTREAM_MEMFD_BLOCKID] = htonl(block_id); + memfd_info[PA_PSTREAM_MEMFD_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); + memfd_info[PA_PSTREAM_MEMFD_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); + + pa_assert(memfd_fd >= 0); + p->write.current->with_ancil_data = true; + p->write.current->ancil_data.creds_valid = false; + p->write.current->ancil_data.nfd = 1; + p->write.current->ancil_data.fds[0] = memfd_fd; + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(memfd_size); + p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + memfd_size; + } + } - flags |= PA_FLAG_SHMDATA; + if (memexport_result >= 0) { if (pa_mempool_is_remote_writable(current_pool)) flags |= PA_FLAG_SHMWRITABLE; send_payload = false; - - shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); - shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); - shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); - shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); - - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size); - p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size; } -/* else */ -/* pa_log_warn("Failed to export memory block."); */ +/* else */ +/* pa_log_warn("Failed to export memory block."); */ if (current_export != p->export) pa_memexport_free(current_export); @@ -833,14 +912,22 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { - if (length != sizeof(re->shm_info)) { + if (length != sizeof(re->per_type.shm_info)) { pa_log_warn("Received SHM memblock frame with invalid frame length."); return -1; } - /* Frame is a memblock frame referencing an SHM memblock */ - re->data = re->shm_info; + /* Frame is a memblock frame referencing a posix SHM memblock */ + re->data = re->per_type.shm_info; + } else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) { + + if (length != sizeof(re->per_type.memfd_info)) { + pa_log_warn("Received memfd memblock frame with invalid frame length."); + return -1; + } + /* Frame is a memblock frame referencing a memfd memblock */ + re->data = &re->per_type.memfd_info; } else if ((flags & PA_FLAG_SHMMASK) == 0) { /* Frame is a memblock frame */ @@ -874,30 +961,54 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { pa_packet_unref(re->packet); } else { - pa_memblock *b; + pa_memblock *b = NULL; uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); - pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); - pa_assert(p->import); - if (!(b = pa_memimport_shm_get(p->import, - ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), - ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), - !!(flags & PA_FLAG_SHMWRITABLE)))) { - - if (pa_log_ratelimit(PA_LOG_DEBUG)) - pa_log_debug("Failed to import memory block."); + if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { + b = pa_memimport_shm_get(p->import, + ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_BLOCKID]), + ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_SHMID]), + ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_INDEX]), + ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_LENGTH]), + !!(flags & PA_FLAG_SHMWRITABLE)); + } else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) { + int memfd_fd; + pa_assert(p->read_ancil_data.nfd == 1); + + memfd_fd = p->read_ancil_data.fds[0]; + pa_assert(memfd_fd >= 0); + + b = pa_memimport_memfd_get(p->import, + ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_BLOCKID]), + memfd_fd, + ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_INDEX]), + ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_LENGTH]), + !!(flags & PA_FLAG_SHMWRITABLE)); + } else { + pa_assert_not_reached(); } + if (!b && pa_log_ratelimit(PA_LOG_DEBUG)) + pa_log_debug("Failed to import memory block."); + if (p->receive_memblock_callback) { int64_t offset; pa_memchunk chunk; chunk.memblock = b; chunk.index = 0; - chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]); + + if (b) { + chunk.length = pa_memblock_get_length(b); + } else { + if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) + chunk.length = ntohl(re->per_type.shm_info[PA_PSTREAM_SHM_LENGTH]); + else if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_MEMFD_FD) { + chunk.length = ntohl(re->per_type.memfd_info[PA_PSTREAM_MEMFD_LENGTH]); + } else + pa_assert_not_reached(); + } offset = (int64_t) ( (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | -- Darwish http://darwish.chasingpointers.com