I'd like to see this one pushed, as I was thinking of writing something on top of it. But see comments below On 2014-11-05 00:26, Peter Meerwald wrote: > From: Peter Meerwald <p.meerwald at bct-electronic.com> > > no functional change, just using several funtion to break up that monstrosity s/funtion/functions > > Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net> > --- > src/pulsecore/pstream.c | 400 ++++++++++++++++++++++++------------------------ > 1 file changed, 204 insertions(+), 196 deletions(-) > > diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c > index 69f9abd..3926353 100644 > --- a/src/pulsecore/pstream.c > +++ b/src/pulsecore/pstream.c > @@ -731,6 +731,205 @@ fail: > return -1; > } > > +static int handle_descriptor(pa_pstream *p, struct pstream_read *re) { > + uint32_t flags, length, channel; > + > + pa_assert(p); > + pa_assert(re); > + > + flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); > + > + if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { > + pa_log_warn("Received SHM frame on a socket where SHM is disabled."); > + return -1; > + } > + > + if (flags == PA_FLAG_SHMRELEASE) { > + /* This is a SHM memblock release frame with no payload */ > + /* pa_log("Got release frame for %u", ntohl(*desc[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ > + > + pa_assert(p->export); > + pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); > + > + goto frame_done; > + > + } else if (flags == PA_FLAG_SHMREVOKE) { > + /* This is a SHM memblock revoke frame with no payload */ > + /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ > + > + pa_assert(p->import); > + pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); > + > + goto frame_done; > + } > + > + length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); > + > + if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) { > + pa_log_warn("Received invalid frame size: %lu", (unsigned long) length); > + return -1; > + } > + > + pa_assert(!re->packet && !re->memblock); > + > + channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); > + > + if (channel == (uint32_t) -1) { > + size_t plen; > + if (flags != 0) { > + pa_log_warn("Received packet frame with invalid flags value."); > + return -1; > + } > + > + /* Frame is a packet frame */ > + re->packet = pa_packet_new(length); > + re->data = (void *) pa_packet_data(re->packet, &plen); > + > + } else { > + > + if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) { > + pa_log_warn("Received memblock frame with invalid seek mode."); > + return -1; > + } > + > + if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { > + if (length != sizeof(re->shm_info)) { > + pa_log_warn("Received SHM memblock frame with invalid frame length."); > + return -1; > + } > + > + /* Frame is a memblock frame referencing an SHM memblock */ > + re->data = re->shm_info; > + > + } else if ((flags & PA_FLAG_SHMMASK) == 0) { > + > + /* Frame is a memblock frame */ > + re->memblock = pa_memblock_new(p->mempool, length); > + re->data = NULL; // XXX > + } else { > + pa_log_warn("Received memblock frame with invalid flags value."); > + return -1; > + } > + } > + > + return 1; > + > +frame_done: > + re->index = 0; > + > + return 0; > +} Did you also mean to call "frame_done" here? The handle_descriptor returns something but the result is unused? Confusing to have both a label and a function with the same name. > + > +static void frame_done(pa_pstream *p, struct pstream_read *re) { > + re->memblock = NULL; > + re->packet = NULL; > + re->index = 0; > + re->data = NULL; > + > +#ifdef HAVE_CREDS > + p->read_ancil_data.creds_valid = false; > + p->read_ancil_data.nfd = 0; > +#endif > +} > + > +static void frame_complete(pa_pstream *p, struct pstream_read *re) { > + if (re->memblock) { > + > + /* This was a memblock frame. We can unref the memblock now */ > + pa_memblock_unref(re->memblock); > + > + } else if (re->packet) { > + > + if (p->receive_packet_callback) > +#ifdef HAVE_CREDS > + p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata); > +#else > + p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata); > +#endif > + > + pa_packet_unref(re->packet); > + } else { > + pa_memblock *b; > + uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); > + pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); > + > + pa_assert(p->import); > + > + if (!(b = pa_memimport_get(p->import, > + ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), > + ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), > + ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), > + ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), > + !!(flags & PA_FLAG_SHMWRITABLE)))) { > + > + if (pa_log_ratelimit(PA_LOG_DEBUG)) > + pa_log_debug("Failed to import memory block."); > + } > + > + if (p->receive_memblock_callback) { > + int64_t offset; > + pa_memchunk chunk; > + > + chunk.memblock = b; > + chunk.index = 0; > + chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]); > + > + offset = (int64_t) ( > + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | > + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); > + > + p->receive_memblock_callback( > + p, > + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), > + offset, > + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, > + &chunk, > + p->receive_memblock_callback_userdata); > + } > + > + if (b) > + pa_memblock_unref(b); > + } > + > + frame_done(p, re); > +} > + > +static void handle_payload(pa_pstream *p, struct pstream_read *re, ssize_t r) { > + size_t l; > + > + /* Is this memblock data? Than pass it to the user */ > + l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r; > + > + if (l > 0) { > + pa_memchunk chunk; > + > + chunk.memblock = re->memblock; > + chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l; > + chunk.length = l; > + > + if (p->receive_memblock_callback) { > + int64_t offset; > + > + offset = (int64_t) ( > + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | > + (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); > + > + p->receive_memblock_callback( > + p, > + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), > + offset, > + ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, > + &chunk, > + p->receive_memblock_callback_userdata); > + } > + > + /* Drop seek info for following callbacks */ > + re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = > + re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = > + re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; > + } > +} > + > static int do_read(pa_pstream *p, struct pstream_read *re) { > void *d; > size_t l; > @@ -793,210 +992,19 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { > re->index += (size_t) r; > > if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) { > - uint32_t flags, length, channel; > - /* Reading of frame descriptor complete */ > - > - flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); > - > - if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { > - pa_log_warn("Received SHM frame on a socket where SHM is disabled."); > - return -1; > - } > - > - if (flags == PA_FLAG_SHMRELEASE) { > - > - /* This is a SHM memblock release frame with no payload */ > - > -/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ > - > - pa_assert(p->export); > - pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); > - > - goto frame_done; > - > - } else if (flags == PA_FLAG_SHMREVOKE) { > - > - /* This is a SHM memblock revoke frame with no payload */ > - > -/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ > - > - pa_assert(p->import); > - pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); > - > - goto frame_done; > - } > - > - length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); > - > - if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) { > - pa_log_warn("Received invalid frame size: %lu", (unsigned long) length); > - return -1; > - } > - > - pa_assert(!re->packet && !re->memblock); > - > - channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); > - > - if (channel == (uint32_t) -1) { > - size_t plen; > - > - if (flags != 0) { > - pa_log_warn("Received packet frame with invalid flags value."); > - return -1; > - } > - > - /* Frame is a packet frame */ > - re->packet = pa_packet_new(length); > - re->data = (void *) pa_packet_data(re->packet, &plen); > - > - } else { > - > - if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) { > - pa_log_warn("Received memblock frame with invalid seek mode."); > - return -1; > - } > - > - if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { > - > - if (length != sizeof(re->shm_info)) { > - pa_log_warn("Received SHM memblock frame with invalid frame length."); > - return -1; > - } > - > - /* Frame is a memblock frame referencing an SHM memblock */ > - re->data = re->shm_info; > - > - } else if ((flags & PA_FLAG_SHMMASK) == 0) { > - > - /* Frame is a memblock frame */ > - > - re->memblock = pa_memblock_new(p->mempool, length); > - re->data = NULL; > - } else { > - > - pa_log_warn("Received memblock frame with invalid flags value."); > - return -1; > - } > - } > - > + handle_descriptor(p, re); > } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) { > /* Frame payload available */ > - > - if (re->memblock && p->receive_memblock_callback) { > - > - /* Is this memblock data? Than pass it to the user */ > - l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r; > - > - if (l > 0) { > - pa_memchunk chunk; > - > - chunk.memblock = re->memblock; > - chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l; > - chunk.length = l; > - > - if (p->receive_memblock_callback) { > - int64_t offset; > - > - offset = (int64_t) ( > - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | > - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); > - > - p->receive_memblock_callback( > - p, > - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), > - offset, > - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, > - &chunk, > - p->receive_memblock_callback_userdata); > - } > - > - /* Drop seek info for following callbacks */ > - re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = > - re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = > - re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; > - } > - } > + if (re->memblock && p->receive_memblock_callback) > + handle_payload(p, re, r); > > /* Frame complete */ > - if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) { > - > - if (re->memblock) { > - > - /* This was a memblock frame. We can unref the memblock now */ > - pa_memblock_unref(re->memblock); > - > - } else if (re->packet) { > - > - if (p->receive_packet_callback) > -#ifdef HAVE_CREDS > - p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata); > -#else > - p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata); > -#endif > - > - pa_packet_unref(re->packet); > - } else { > - pa_memblock *b; > - uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); > - pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); > - > - pa_assert(p->import); > - > - if (!(b = pa_memimport_get(p->import, > - ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), > - ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]), > - ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), > - ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), > - !!(flags & PA_FLAG_SHMWRITABLE)))) { > - > - if (pa_log_ratelimit(PA_LOG_DEBUG)) > - pa_log_debug("Failed to import memory block."); > - } > - > - if (p->receive_memblock_callback) { > - int64_t offset; > - pa_memchunk chunk; > - > - chunk.memblock = b; > - chunk.index = 0; > - chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]); > - > - offset = (int64_t) ( > - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | > - (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); > - > - p->receive_memblock_callback( > - p, > - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), > - offset, > - ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, > - &chunk, > - p->receive_memblock_callback_userdata); > - } > - > - if (b) > - pa_memblock_unref(b); > - } > - > - goto frame_done; > - } > + if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) > + frame_complete(p, re); > } > > return 0; > > -frame_done: > - re->memblock = NULL; > - re->packet = NULL; > - re->index = 0; > - re->data = NULL; > - > -#ifdef HAVE_CREDS > - p->read_ancil_data.creds_valid = false; > - p->read_ancil_data.nfd = 0; > -#endif > - > - return 0; > - > fail: > if (release_memblock) > pa_memblock_release(release_memblock); > -- David Henningsson, Canonical Ltd. https://launchpad.net/~diwic