While most reftable blocks are written to disk as-is, blocks for log records are compressed with zlib. To compress them we use `compress2()`, which is a simple wrapper around the more complex `zstream` interface that would require multiple function invocations. One downside of this interface is that `compress2()` will reallocate internal state of the `zstream` interface on every single invocation. Consequently, as we call `compress2()` for every single log block which we are about to write, this can lead to quite some memory allocation churn. Refactor the code so that the block writer reuses a `zstream`. This significantly reduces the number of bytes allocated when writing many refs in a single transaction, as demonstrated by the following benchmark that writes 100k refs in a single transaction. Before: HEAP SUMMARY: in use at exit: 671,931 bytes in 151 blocks total heap usage: 22,631,887 allocs, 22,631,736 frees, 1,854,670,793 bytes allocated After: HEAP SUMMARY: in use at exit: 671,931 bytes in 151 blocks total heap usage: 22,620,528 allocs, 22,620,377 frees, 1,245,549,984 bytes allocated Signed-off-by: Patrick Steinhardt <ps@xxxxxx> --- reftable/block.c | 83 +++++++++++++++++++++++++++++++---------------- reftable/block.h | 1 + reftable/writer.c | 4 +++ 3 files changed, 60 insertions(+), 28 deletions(-) diff --git a/reftable/block.c b/reftable/block.c index e2a2cee58d..1fa74d418f 100644 --- a/reftable/block.c +++ b/reftable/block.c @@ -76,6 +76,10 @@ void block_writer_init(struct block_writer *bw, uint8_t typ, uint8_t *buf, bw->entries = 0; bw->restart_len = 0; bw->last_key.len = 0; + if (!bw->zstream) { + REFTABLE_CALLOC_ARRAY(bw->zstream, 1); + deflateInit(bw->zstream, 9); + } } uint8_t block_writer_type(struct block_writer *bw) @@ -139,39 +143,60 @@ int block_writer_finish(struct block_writer *w) w->next += 2; put_be24(w->buf + 1 + w->header_off, w->next); + /* + * Log records are stored zlib-compressed. Note that the compression + * also spans over the restart points we have just written. + */ if (block_writer_type(w) == BLOCK_TYPE_LOG) { int block_header_skip = 4 + w->header_off; - uLongf src_len = w->next - block_header_skip; - uLongf dest_cap = src_len * 1.001 + 12; - uint8_t *compressed; - - REFTABLE_ALLOC_ARRAY(compressed, dest_cap); - - while (1) { - uLongf out_dest_len = dest_cap; - int zresult = compress2(compressed, &out_dest_len, - w->buf + block_header_skip, - src_len, 9); - if (zresult == Z_BUF_ERROR && dest_cap < LONG_MAX) { - dest_cap *= 2; - compressed = - reftable_realloc(compressed, dest_cap); - if (compressed) - continue; - } - - if (Z_OK != zresult) { - reftable_free(compressed); - return REFTABLE_ZLIB_ERROR; - } - - memcpy(w->buf + block_header_skip, compressed, - out_dest_len); - w->next = out_dest_len + block_header_skip; + uLongf src_len = w->next - block_header_skip, compressed_len; + unsigned char *compressed; + int ret; + + ret = deflateReset(w->zstream); + if (ret != Z_OK) + return REFTABLE_ZLIB_ERROR; + + /* + * Precompute the upper bound of how many bytes the compressed + * data may end up with. Combined with `Z_FINISH`, `deflate()` + * is guaranteed to return `Z_STREAM_END`. + */ + compressed_len = deflateBound(w->zstream, src_len); + REFTABLE_ALLOC_ARRAY(compressed, compressed_len); + + w->zstream->next_out = compressed; + w->zstream->avail_out = compressed_len; + w->zstream->next_in = w->buf + block_header_skip; + w->zstream->avail_in = src_len; + + /* + * We want to perform all decompression in a single + * step, which is why we can pass Z_FINISH here. Note + * that both `Z_OK` and `Z_BUF_ERROR` indicate that we + * need to retry according to documentation. + * + * If the call fails we retry with a bigger output + * buffer. + */ + ret = deflate(w->zstream, Z_FINISH); + if (ret != Z_STREAM_END) { reftable_free(compressed); - break; + return REFTABLE_ZLIB_ERROR; } + + /* + * Overwrite the uncompressed data we have already written and + * adjust the `next` pointer to point right after the + * compressed data. + */ + memcpy(w->buf + block_header_skip, compressed, + w->zstream->total_out); + w->next = w->zstream->total_out + block_header_skip; + + reftable_free(compressed); } + return w->next; } @@ -425,6 +450,8 @@ int block_reader_seek(struct block_reader *br, struct block_iter *it, void block_writer_release(struct block_writer *bw) { + deflateEnd(bw->zstream); + FREE_AND_NULL(bw->zstream); FREE_AND_NULL(bw->restarts); strbuf_release(&bw->last_key); /* the block is not owned. */ diff --git a/reftable/block.h b/reftable/block.h index 47acc62c0a..1375957fc8 100644 --- a/reftable/block.h +++ b/reftable/block.h @@ -18,6 +18,7 @@ license that can be found in the LICENSE file or at * allocation overhead. */ struct block_writer { + z_stream *zstream; uint8_t *buf; uint32_t block_size; diff --git a/reftable/writer.c b/reftable/writer.c index d347ec4cc6..51e663bb19 100644 --- a/reftable/writer.c +++ b/reftable/writer.c @@ -153,6 +153,10 @@ void reftable_writer_free(struct reftable_writer *w) { if (!w) return; + if (w->block_writer) { + block_writer_release(w->block_writer); + w->block_writer = NULL; + } reftable_free(w->block); reftable_free(w); } -- 2.44.GIT
Attachment:
signature.asc
Description: PGP signature