From: Peter Meerwald <p.meerwald@xxxxxxxxxxxxxxxxxx> no functional change, just using several funtion to break up that monstrosity Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net> --- src/pulsecore/pstream.c | 400 ++++++++++++++++++++++++------------------------ 1 file changed, 204 insertions(+), 196 deletions(-) diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 400df42..8846d14 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -731,6 +731,205 @@ fail: return -1; } +static int handle_descriptor(pa_pstream *p, struct pstream_read *re) { + uint32_t flags, length, channel; + + pa_assert(p); + pa_assert(re); + + flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); + + if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { + pa_log_warn("Received SHM frame on a socket where SHM is disabled."); + return -1; + } + + if (flags == PA_FLAG_SHMRELEASE) { + /* This is a SHM memblock release frame with no payload */ + /* pa_log("Got release frame for %u", ntohl(*desc[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ + + pa_assert(p->export); + pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + + goto frame_done; + + } else if (flags == PA_FLAG_SHMREVOKE) { + /* This is a SHM memblock revoke frame with no payload */ + /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ + + pa_assert(p->import); + pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + + goto frame_done; + } + + length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); + + if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) { + pa_log_warn("Received invalid frame size: %lu", (unsigned long) length); + return -1; + } + + pa_assert(!re->packet && !re->memblock); + + channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); + + if (channel == (uint32_t) -1) { + size_t plen; + if (flags != 0) { + pa_log_warn("Received packet frame with invalid flags value."); + return -1; + } + + /* Frame is a packet frame */ + re->packet = pa_packet_new(length); + re->data = (void *) pa_packet_data(re->packet, &plen); + + } else { + + if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) { + pa_log_warn("Received memblock frame with invalid seek mode."); + return -1; + } + + if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { + if (length != sizeof(re->shm_info)) { + pa_log_warn("Received SHM memblock frame with invalid frame length."); + return -1; + } + + /* Frame is a memblock frame referencing an SHM memblock */ + re->data = re->shm_info; + + } else if ((flags & PA_FLAG_SHMMASK) == 0) { + + /* Frame is a memblock frame */ + re->memblock = pa_memblock_new(p->mempool, length); + re->data = NULL; // XXX + } else { + pa_log_warn("Received memblock frame with invalid flags value."); + return -1; + } + } + + return 1; + +frame_done: + re->index = 0; + + return 0; +} + +static void frame_done(pa_pstream *p, struct pstream_read *re) { + re->memblock = NULL; + re->packet = NULL; + re->index = 0; + re->data = NULL; + +#ifdef HAVE_CREDS + p->read_ancil.creds_valid = false; + p->read_ancil.nfd = 0; +#endif +} + +static void frame_complete(pa_pstream *p, struct pstream_read *re) { + if (re->memblock) { + + /* This was a memblock frame. We can unref the memblock now */ + pa_memblock_unref(re->memblock); + + } else if (re->packet) { + + if (p->receive_packet_callback) +#ifdef HAVE_CREDS + p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata); +#else + p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata); +#endif + + pa_packet_unref(re->packet); + } else { + pa_memblock *b; + uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); + pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); + + pa_assert(p->import); + + if (!(b = pa_memimport_get(p->import, + ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), + ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), + ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), + ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), + !!(flags & PA_FLAG_SHMWRITABLE)))) { + + if (pa_log_ratelimit(PA_LOG_DEBUG)) + pa_log_debug("Failed to import memory block."); + } + + if (p->receive_memblock_callback) { + int64_t offset; + pa_memchunk chunk; + + chunk.memblock = b; + chunk.index = 0; + chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]); + + offset = (int64_t) ( + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); + + p->receive_memblock_callback( + p, + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), + offset, + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, + &chunk, + p->receive_memblock_callback_userdata); + } + + if (b) + pa_memblock_unref(b); + } + + frame_done(p, re); +} + +static void handle_payload(pa_pstream *p, struct pstream_read *re, ssize_t r) { + size_t l; + + /* Is this memblock data? Than pass it to the user */ + l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r; + + if (l > 0) { + pa_memchunk chunk; + + chunk.memblock = re->memblock; + chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l; + chunk.length = l; + + if (p->receive_memblock_callback) { + int64_t offset; + + offset = (int64_t) ( + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); + + p->receive_memblock_callback( + p, + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), + offset, + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, + &chunk, + p->receive_memblock_callback_userdata); + } + + /* Drop seek info for following callbacks */ + re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = + re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = + re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; + } +} + static int do_read(pa_pstream *p, struct pstream_read *re) { void *d; size_t l; @@ -793,210 +992,19 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { re->index += (size_t) r; if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) { - uint32_t flags, length, channel; - /* Reading of frame descriptor complete */ - - flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); - - if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { - pa_log_warn("Received SHM frame on a socket where SHM is disabled."); - return -1; - } - - if (flags == PA_FLAG_SHMRELEASE) { - - /* This is a SHM memblock release frame with no payload */ - -/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ - - pa_assert(p->export); - pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); - - goto frame_done; - - } else if (flags == PA_FLAG_SHMREVOKE) { - - /* This is a SHM memblock revoke frame with no payload */ - -/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ - - pa_assert(p->import); - pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); - - goto frame_done; - } - - length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); - - if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) { - pa_log_warn("Received invalid frame size: %lu", (unsigned long) length); - return -1; - } - - pa_assert(!re->packet && !re->memblock); - - channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); - - if (channel == (uint32_t) -1) { - size_t plen; - - if (flags != 0) { - pa_log_warn("Received packet frame with invalid flags value."); - return -1; - } - - /* Frame is a packet frame */ - re->packet = pa_packet_new(length); - re->data = (void *) pa_packet_data(re->packet, &plen); - - } else { - - if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) { - pa_log_warn("Received memblock frame with invalid seek mode."); - return -1; - } - - if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { - - if (length != sizeof(re->shm_info)) { - pa_log_warn("Received SHM memblock frame with invalid frame length."); - return -1; - } - - /* Frame is a memblock frame referencing an SHM memblock */ - re->data = re->shm_info; - - } else if ((flags & PA_FLAG_SHMMASK) == 0) { - - /* Frame is a memblock frame */ - - re->memblock = pa_memblock_new(p->mempool, length); - re->data = NULL; - } else { - - pa_log_warn("Received memblock frame with invalid flags value."); - return -1; - } - } - + handle_descriptor(p, re); } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) { /* Frame payload available */ - - if (re->memblock && p->receive_memblock_callback) { - - /* Is this memblock data? Than pass it to the user */ - l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r; - - if (l > 0) { - pa_memchunk chunk; - - chunk.memblock = re->memblock; - chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l; - chunk.length = l; - - if (p->receive_memblock_callback) { - int64_t offset; - - offset = (int64_t) ( - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); - - p->receive_memblock_callback( - p, - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), - offset, - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, - &chunk, - p->receive_memblock_callback_userdata); - } - - /* Drop seek info for following callbacks */ - re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = - re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = - re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; - } - } + if (re->memblock && p->receive_memblock_callback) + handle_payload(p, re, r); /* Frame complete */ - if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) { - - if (re->memblock) { - - /* This was a memblock frame. We can unref the memblock now */ - pa_memblock_unref(re->memblock); - - } else if (re->packet) { - - if (p->receive_packet_callback) -#ifdef HAVE_CREDS - p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata); -#else - p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata); -#endif - - pa_packet_unref(re->packet); - } else { - pa_memblock *b; - uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); - pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); - - pa_assert(p->import); - - if (!(b = pa_memimport_get(p->import, - ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), - ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), - ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), - !!(flags & PA_FLAG_SHMWRITABLE)))) { - - if (pa_log_ratelimit(PA_LOG_DEBUG)) - pa_log_debug("Failed to import memory block."); - } - - if (p->receive_memblock_callback) { - int64_t offset; - pa_memchunk chunk; - - chunk.memblock = b; - chunk.index = 0; - chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]); - - offset = (int64_t) ( - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); - - p->receive_memblock_callback( - p, - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), - offset, - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, - &chunk, - p->receive_memblock_callback_userdata); - } - - if (b) - pa_memblock_unref(b); - } - - goto frame_done; - } + if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) + frame_complete(p, re); } return 0; -frame_done: - re->memblock = NULL; - re->packet = NULL; - re->index = 0; - re->data = NULL; - -#ifdef HAVE_CREDS - p->read_ancil.creds_valid = false; - p->read_ancil.nfd = 0; -#endif - - return 0; - fail: if (release_memblock) pa_memblock_release(release_memblock); -- 1.9.1