From: Peter Meerwald <p.meerwald@xxxxxxxxxxxxxxxxxx> idea is similar to b4342845d, Optimize write of smaller packages, but for read Signed-off-by: Peter Meerwald <pmeerw at pmeerw.net> --- src/pulsecore/pstream.c | 90 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 73 insertions(+), 17 deletions(-) diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 8846d14..91868a7 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -76,7 +76,8 @@ typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX]; #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t)) #define PA_PSTREAM_SHM_SIZE (PA_PSTREAM_SHM_MAX*sizeof(uint32_t)) -#define MINIBUF_SIZE (256) +#define WRITE_MINIBUF_SIZE 256 +#define READ_MINIBUF_SIZE 40 /* To allow uploading a single sample in one frame, this value should be the * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h. @@ -120,7 +121,10 @@ struct item_info { }; struct pstream_read { - pa_pstream_descriptor descriptor; + union { + uint8_t minibuf[READ_MINIBUF_SIZE]; + pa_pstream_descriptor descriptor; + }; pa_memblock *memblock; pa_packet *packet; uint32_t shm_info[PA_PSTREAM_SHM_MAX]; @@ -143,7 +147,7 @@ struct pa_pstream { struct { union { - uint8_t minibuf[MINIBUF_SIZE]; + uint8_t minibuf[WRITE_MINIBUF_SIZE]; pa_pstream_descriptor descriptor; }; struct item_info* current; @@ -535,7 +539,7 @@ static void prepare_next_write_item(pa_pstream *p) { p->write.data = (void *) pa_packet_data(p->write.current->per_type.packet, &plen); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen); - if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) { + if (plen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) { memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen); p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen; } @@ -548,7 +552,7 @@ static void prepare_next_write_item(pa_pstream *p) { p->write.data = (void *) pa_tagstruct_data(p->write.current->per_type.tagstruct, &tlen); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) tlen); - if (tlen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) { + if (tlen <= WRITE_MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) { memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, tlen); p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + tlen; } @@ -938,7 +942,12 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { pa_assert(p); pa_assert(PA_REFCNT_VALUE(p) > 0); - if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) { + if (re->index == 0) { + /* special case: expecting a new descriptor but provide extra space; + * often we can save a read() */ + d = (uint8_t*) re->minibuf; + l = READ_MINIBUF_SIZE; + } else if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) { d = (uint8_t*) re->descriptor + re->index; l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index; } else { @@ -989,18 +998,65 @@ static int do_read(pa_pstream *p, struct pstream_read *re) { if (release_memblock) pa_memblock_release(release_memblock); - re->index += (size_t) r; - - if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) { - handle_descriptor(p, re); - } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) { - /* Frame payload available */ - if (re->memblock && p->receive_memblock_callback) - handle_payload(p, re, r); + if (re->index == 0 && r > (int) PA_PSTREAM_DESCRIPTOR_SIZE) { + uint8_t *m = re->minibuf; + + while (r > 0) { + int frame_remaining; + if (r >= (int) PA_PSTREAM_DESCRIPTOR_SIZE) { + /* Handle descriptor */ + memmove(re->descriptor, m, PA_PSTREAM_DESCRIPTOR_SIZE); + frame_remaining = handle_descriptor(p, re); + r -= PA_PSTREAM_DESCRIPTOR_SIZE; + m += PA_PSTREAM_DESCRIPTOR_SIZE; + } else { + /* Move remaining descriptor part */ + memmove(re->descriptor, m, r); + re->index = r; + break; + } - /* Frame complete */ - if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) - frame_complete(p, re); + if (frame_remaining > 0) { + /* Copy data from minibuf */ + pa_assert(re->data || re->memblock); + + l = PA_MIN(ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), r); + if (re->data) + memcpy(re->data, m, l); + else { + d = pa_memblock_acquire(re->memblock); + memcpy(d, m, l); + pa_memblock_release(re->memblock); + } + r -= l; + m += l; + re->index = PA_PSTREAM_DESCRIPTOR_SIZE + l; + + /* Frame payload available */ + if (re->memblock && p->receive_memblock_callback) + handle_payload(p, re, l); + + /* Frame complete */ + if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) + frame_complete(p, re); + } + else + frame_done(p, re); + } + } else { + re->index += (size_t) r; + + if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) { + handle_descriptor(p, re); + } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) { + /* Frame payload available */ + if (re->memblock && p->receive_memblock_callback) + handle_payload(p, re, r); + + /* Frame complete */ + if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) + frame_complete(p, re); + } } return 0; -- 1.9.1