[PATCH 7/9] reftable/block: reuse zstream when writing log blocks

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



While most reftable blocks are written to disk as-is, blocks for log
records are compressed with zlib. To compress them we use `compress2()`,
which is a simple wrapper around the more complex `zstream` interface
that would require multiple function invocations.

One downside of this interface is that `compress2()` will reallocate
internal state of the `zstream` interface on every single invocation.
Consequently, as we call `compress2()` for every single log block which
we are about to write, this can lead to quite some memory allocation
churn.

Refactor the code so that the block writer reuses a `zstream`. This
significantly reduces the number of bytes allocated when writing many
refs in a single transaction, as demonstrated by the following benchmark
that writes 100k refs in a single transaction.

Before:

  HEAP SUMMARY:
      in use at exit: 671,931 bytes in 151 blocks
    total heap usage: 22,631,887 allocs, 22,631,736 frees, 1,854,670,793 bytes allocated

After:

  HEAP SUMMARY:
      in use at exit: 671,931 bytes in 151 blocks
    total heap usage: 22,620,528 allocs, 22,620,377 frees, 1,245,549,984 bytes allocated

Signed-off-by: Patrick Steinhardt <ps@xxxxxx>
---
 reftable/block.c  | 83 +++++++++++++++++++++++++++++++----------------
 reftable/block.h  |  1 +
 reftable/writer.c |  4 +++
 3 files changed, 60 insertions(+), 28 deletions(-)

diff --git a/reftable/block.c b/reftable/block.c
index e2a2cee58d..1fa74d418f 100644
--- a/reftable/block.c
+++ b/reftable/block.c
@@ -76,6 +76,10 @@ void block_writer_init(struct block_writer *bw, uint8_t typ, uint8_t *buf,
 	bw->entries = 0;
 	bw->restart_len = 0;
 	bw->last_key.len = 0;
+	if (!bw->zstream) {
+		REFTABLE_CALLOC_ARRAY(bw->zstream, 1);
+		deflateInit(bw->zstream, 9);
+	}
 }
 
 uint8_t block_writer_type(struct block_writer *bw)
@@ -139,39 +143,60 @@ int block_writer_finish(struct block_writer *w)
 	w->next += 2;
 	put_be24(w->buf + 1 + w->header_off, w->next);
 
+	/*
+	 * Log records are stored zlib-compressed. Note that the compression
+	 * also spans over the restart points we have just written.
+	 */
 	if (block_writer_type(w) == BLOCK_TYPE_LOG) {
 		int block_header_skip = 4 + w->header_off;
-		uLongf src_len = w->next - block_header_skip;
-		uLongf dest_cap = src_len * 1.001 + 12;
-		uint8_t *compressed;
-
-		REFTABLE_ALLOC_ARRAY(compressed, dest_cap);
-
-		while (1) {
-			uLongf out_dest_len = dest_cap;
-			int zresult = compress2(compressed, &out_dest_len,
-						w->buf + block_header_skip,
-						src_len, 9);
-			if (zresult == Z_BUF_ERROR && dest_cap < LONG_MAX) {
-				dest_cap *= 2;
-				compressed =
-					reftable_realloc(compressed, dest_cap);
-				if (compressed)
-					continue;
-			}
-
-			if (Z_OK != zresult) {
-				reftable_free(compressed);
-				return REFTABLE_ZLIB_ERROR;
-			}
-
-			memcpy(w->buf + block_header_skip, compressed,
-			       out_dest_len);
-			w->next = out_dest_len + block_header_skip;
+		uLongf src_len = w->next - block_header_skip, compressed_len;
+		unsigned char *compressed;
+		int ret;
+
+		ret = deflateReset(w->zstream);
+		if (ret != Z_OK)
+			return REFTABLE_ZLIB_ERROR;
+
+		/*
+		 * Precompute the upper bound of how many bytes the compressed
+		 * data may end up with. Combined with `Z_FINISH`, `deflate()`
+		 * is guaranteed to return `Z_STREAM_END`.
+		 */
+		compressed_len = deflateBound(w->zstream, src_len);
+		REFTABLE_ALLOC_ARRAY(compressed, compressed_len);
+
+		w->zstream->next_out = compressed;
+		w->zstream->avail_out = compressed_len;
+		w->zstream->next_in = w->buf + block_header_skip;
+		w->zstream->avail_in = src_len;
+
+		/*
+		 * We want to perform all decompression in a single
+		 * step, which is why we can pass Z_FINISH here. Note
+		 * that both `Z_OK` and `Z_BUF_ERROR` indicate that we
+		 * need to retry according to documentation.
+		 *
+		 * If the call fails we retry with a bigger output
+		 * buffer.
+		 */
+		ret = deflate(w->zstream, Z_FINISH);
+		if (ret != Z_STREAM_END) {
 			reftable_free(compressed);
-			break;
+			return REFTABLE_ZLIB_ERROR;
 		}
+
+		/*
+		 * Overwrite the uncompressed data we have already written and
+		 * adjust the `next` pointer to point right after the
+		 * compressed data.
+		 */
+		memcpy(w->buf + block_header_skip, compressed,
+		       w->zstream->total_out);
+		w->next = w->zstream->total_out + block_header_skip;
+
+		reftable_free(compressed);
 	}
+
 	return w->next;
 }
 
@@ -425,6 +450,8 @@ int block_reader_seek(struct block_reader *br, struct block_iter *it,
 
 void block_writer_release(struct block_writer *bw)
 {
+	deflateEnd(bw->zstream);
+	FREE_AND_NULL(bw->zstream);
 	FREE_AND_NULL(bw->restarts);
 	strbuf_release(&bw->last_key);
 	/* the block is not owned. */
diff --git a/reftable/block.h b/reftable/block.h
index 47acc62c0a..1375957fc8 100644
--- a/reftable/block.h
+++ b/reftable/block.h
@@ -18,6 +18,7 @@ license that can be found in the LICENSE file or at
  * allocation overhead.
  */
 struct block_writer {
+	z_stream *zstream;
 	uint8_t *buf;
 	uint32_t block_size;
 
diff --git a/reftable/writer.c b/reftable/writer.c
index d347ec4cc6..51e663bb19 100644
--- a/reftable/writer.c
+++ b/reftable/writer.c
@@ -153,6 +153,10 @@ void reftable_writer_free(struct reftable_writer *w)
 {
 	if (!w)
 		return;
+	if (w->block_writer) {
+		block_writer_release(w->block_writer);
+		w->block_writer = NULL;
+	}
 	reftable_free(w->block);
 	reftable_free(w);
 }
-- 
2.44.GIT

Attachment: signature.asc
Description: PGP signature


[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]

  Powered by Linux