diff mbox series

[7/9] reftable/block: reuse zstream when writing log blocks

Message ID 86dab54dfe4501dfa5e50e5a01513c890b62bb4d.1712078736.git.ps@pks.im (mailing list archive)
State Superseded
Headers show
Series reftable: optimize write performance | expand

Commit Message

Patrick Steinhardt April 2, 2024, 5:30 p.m. UTC
While most reftable blocks are written to disk as-is, blocks for log
records are compressed with zlib. To compress them we use `compress2()`,
which is a simple wrapper around the more complex `zstream` interface
that would require multiple function invocations.

One downside of this interface is that `compress2()` will reallocate
internal state of the `zstream` interface on every single invocation.
Consequently, as we call `compress2()` for every single log block which
we are about to write, this can lead to quite some memory allocation
churn.

Refactor the code so that the block writer reuses a `zstream`. This
significantly reduces the number of bytes allocated when writing many
refs in a single transaction, as demonstrated by the following benchmark
that writes 100k refs in a single transaction.

Before:

  HEAP SUMMARY:
      in use at exit: 671,931 bytes in 151 blocks
    total heap usage: 22,631,887 allocs, 22,631,736 frees, 1,854,670,793 bytes allocated

After:

  HEAP SUMMARY:
      in use at exit: 671,931 bytes in 151 blocks
    total heap usage: 22,620,528 allocs, 22,620,377 frees, 1,245,549,984 bytes allocated

Signed-off-by: Patrick Steinhardt <ps@pks.im>
---
 reftable/block.c  | 83 +++++++++++++++++++++++++++++++----------------
 reftable/block.h  |  1 +
 reftable/writer.c |  4 +++
 3 files changed, 60 insertions(+), 28 deletions(-)

Comments

Junio C Hamano April 3, 2024, 7:35 p.m. UTC | #1
Patrick Steinhardt <ps@pks.im> writes:

> @@ -139,39 +143,60 @@ int block_writer_finish(struct block_writer *w)
>  	w->next += 2;
>  	put_be24(w->buf + 1 + w->header_off, w->next);
>  
> +	/*
> +	 * Log records are stored zlib-compressed. Note that the compression
> +	 * also spans over the restart points we have just written.
> +	 */
>  	if (block_writer_type(w) == BLOCK_TYPE_LOG) {
>  		int block_header_skip = 4 + w->header_off;
> +		uLongf src_len = w->next - block_header_skip, compressed_len;
> +		unsigned char *compressed;
> +		int ret;
> +
> +		ret = deflateReset(w->zstream);
> +		if (ret != Z_OK)
> +			return REFTABLE_ZLIB_ERROR;
> +
> +		/*
> +		 * Precompute the upper bound of how many bytes the compressed
> +		 * data may end up with. Combined with `Z_FINISH`, `deflate()`
> +		 * is guaranteed to return `Z_STREAM_END`.
> +		 */
> +		compressed_len = deflateBound(w->zstream, src_len);
> +		REFTABLE_ALLOC_ARRAY(compressed, compressed_len);

OK.

> +		w->zstream->next_out = compressed;
> +		w->zstream->avail_out = compressed_len;
> +		w->zstream->next_in = w->buf + block_header_skip;
> +		w->zstream->avail_in = src_len;
> +
> +		/*
> +		 * We want to perform all decompression in a single
> +		 * step, which is why we can pass Z_FINISH here. Note
> +		 * that both `Z_OK` and `Z_BUF_ERROR` indicate that we
> +		 * need to retry according to documentation.
> +		 *
> +		 * If the call fails we retry with a bigger output
> +		 * buffer.
> +		 */

I am not sure where the retry is happening, though.

block_writer_finish() is called by writer_flush_nonempty_block()
which returns a negative return to its caller, which is
writer_flush_block().  writer_flush_block() in turn returns a
negative return to its callers from writer_add_record(),
write_finish_section(), and write_object_record().  Nobody seems to
react to REFTABLE_ZLIB_ERROR (other than the reftable/error.c that
stringifies the error for messages).

But we have asked deflateBound() so if we did not get Z_STREAM_END,
wouldn't it mean some data corruption that retrying would not help?

> +		ret = deflate(w->zstream, Z_FINISH);
> +		if (ret != Z_STREAM_END) {
>  			reftable_free(compressed);
> -			break;
> +			return REFTABLE_ZLIB_ERROR;
>  		}
> +
> +		/*
> +		 * Overwrite the uncompressed data we have already written and
> +		 * adjust the `next` pointer to point right after the
> +		 * compressed data.
> +		 */
> +		memcpy(w->buf + block_header_skip, compressed,
> +		       w->zstream->total_out);
> +		w->next = w->zstream->total_out + block_header_skip;
> +
> +		reftable_free(compressed);
>  	}
> +
>  	return w->next;
>  }

OK.

> @@ -425,6 +450,8 @@ int block_reader_seek(struct block_reader *br, struct block_iter *it,
>  
>  void block_writer_release(struct block_writer *bw)
>  {
> +	deflateEnd(bw->zstream);
> +	FREE_AND_NULL(bw->zstream);
>  	FREE_AND_NULL(bw->restarts);
>  	strbuf_release(&bw->last_key);
>  	/* the block is not owned. */
> diff --git a/reftable/block.h b/reftable/block.h
> index 47acc62c0a..1375957fc8 100644
> --- a/reftable/block.h
> +++ b/reftable/block.h
> @@ -18,6 +18,7 @@ license that can be found in the LICENSE file or at
>   * allocation overhead.
>   */
>  struct block_writer {
> +	z_stream *zstream;
>  	uint8_t *buf;
>  	uint32_t block_size;
>  
> diff --git a/reftable/writer.c b/reftable/writer.c
> index d347ec4cc6..51e663bb19 100644
> --- a/reftable/writer.c
> +++ b/reftable/writer.c
> @@ -153,6 +153,10 @@ void reftable_writer_free(struct reftable_writer *w)
>  {
>  	if (!w)
>  		return;
> +	if (w->block_writer) {
> +		block_writer_release(w->block_writer);
> +		w->block_writer = NULL;
> +	}

This smells like an orthogonal fix to an unrelated resource leakage?

>  	reftable_free(w->block);
>  	reftable_free(w);
>  }

Thanks.
Patrick Steinhardt April 4, 2024, 5:36 a.m. UTC | #2
On Wed, Apr 03, 2024 at 12:35:22PM -0700, Junio C Hamano wrote:
> Patrick Steinhardt <ps@pks.im> writes:
> 
> > @@ -139,39 +143,60 @@ int block_writer_finish(struct block_writer *w)
> >  	w->next += 2;
> >  	put_be24(w->buf + 1 + w->header_off, w->next);
> >  
> > +	/*
> > +	 * Log records are stored zlib-compressed. Note that the compression
> > +	 * also spans over the restart points we have just written.
> > +	 */
> >  	if (block_writer_type(w) == BLOCK_TYPE_LOG) {
> >  		int block_header_skip = 4 + w->header_off;
> > +		uLongf src_len = w->next - block_header_skip, compressed_len;
> > +		unsigned char *compressed;
> > +		int ret;
> > +
> > +		ret = deflateReset(w->zstream);
> > +		if (ret != Z_OK)
> > +			return REFTABLE_ZLIB_ERROR;
> > +
> > +		/*
> > +		 * Precompute the upper bound of how many bytes the compressed
> > +		 * data may end up with. Combined with `Z_FINISH`, `deflate()`
> > +		 * is guaranteed to return `Z_STREAM_END`.
> > +		 */
> > +		compressed_len = deflateBound(w->zstream, src_len);
> > +		REFTABLE_ALLOC_ARRAY(compressed, compressed_len);
> 
> OK.
> 
> > +		w->zstream->next_out = compressed;
> > +		w->zstream->avail_out = compressed_len;
> > +		w->zstream->next_in = w->buf + block_header_skip;
> > +		w->zstream->avail_in = src_len;
> > +
> > +		/*
> > +		 * We want to perform all decompression in a single
> > +		 * step, which is why we can pass Z_FINISH here. Note
> > +		 * that both `Z_OK` and `Z_BUF_ERROR` indicate that we
> > +		 * need to retry according to documentation.
> > +		 *
> > +		 * If the call fails we retry with a bigger output
> > +		 * buffer.
> > +		 */
> 
> I am not sure where the retry is happening, though.
> 
> block_writer_finish() is called by writer_flush_nonempty_block()
> which returns a negative return to its caller, which is
> writer_flush_block().  writer_flush_block() in turn returns a
> negative return to its callers from writer_add_record(),
> write_finish_section(), and write_object_record().  Nobody seems to
> react to REFTABLE_ZLIB_ERROR (other than the reftable/error.c that
> stringifies the error for messages).
> 
> But we have asked deflateBound() so if we did not get Z_STREAM_END,
> wouldn't it mean some data corruption that retrying would not help?

Yeha, this comment is stale from a previous iteration.

> > +		ret = deflate(w->zstream, Z_FINISH);
> > +		if (ret != Z_STREAM_END) {
> >  			reftable_free(compressed);
> > -			break;
> > +			return REFTABLE_ZLIB_ERROR;
> >  		}
> > +
> > +		/*
> > +		 * Overwrite the uncompressed data we have already written and
> > +		 * adjust the `next` pointer to point right after the
> > +		 * compressed data.
> > +		 */
> > +		memcpy(w->buf + block_header_skip, compressed,
> > +		       w->zstream->total_out);
> > +		w->next = w->zstream->total_out + block_header_skip;
> > +
> > +		reftable_free(compressed);
> >  	}
> > +
> >  	return w->next;
> >  }
> 
> OK.
> 
> > @@ -425,6 +450,8 @@ int block_reader_seek(struct block_reader *br, struct block_iter *it,
> >  
> >  void block_writer_release(struct block_writer *bw)
> >  {
> > +	deflateEnd(bw->zstream);
> > +	FREE_AND_NULL(bw->zstream);
> >  	FREE_AND_NULL(bw->restarts);
> >  	strbuf_release(&bw->last_key);
> >  	/* the block is not owned. */
> > diff --git a/reftable/block.h b/reftable/block.h
> > index 47acc62c0a..1375957fc8 100644
> > --- a/reftable/block.h
> > +++ b/reftable/block.h
> > @@ -18,6 +18,7 @@ license that can be found in the LICENSE file or at
> >   * allocation overhead.
> >   */
> >  struct block_writer {
> > +	z_stream *zstream;
> >  	uint8_t *buf;
> >  	uint32_t block_size;
> >  
> > diff --git a/reftable/writer.c b/reftable/writer.c
> > index d347ec4cc6..51e663bb19 100644
> > --- a/reftable/writer.c
> > +++ b/reftable/writer.c
> > @@ -153,6 +153,10 @@ void reftable_writer_free(struct reftable_writer *w)
> >  {
> >  	if (!w)
> >  		return;
> > +	if (w->block_writer) {
> > +		block_writer_release(w->block_writer);
> > +		w->block_writer = NULL;
> > +	}
> 
> This smells like an orthogonal fix to an unrelated resource leakage?

True. The memory leak simply never occurred before this change, but in
theory it could have happened. Will move into a separate commit.

Patrick

> >  	reftable_free(w->block);
> >  	reftable_free(w);
> >  }
> 
> Thanks.
diff mbox series

Patch

diff --git a/reftable/block.c b/reftable/block.c
index e2a2cee58d..1fa74d418f 100644
--- a/reftable/block.c
+++ b/reftable/block.c
@@ -76,6 +76,10 @@  void block_writer_init(struct block_writer *bw, uint8_t typ, uint8_t *buf,
 	bw->entries = 0;
 	bw->restart_len = 0;
 	bw->last_key.len = 0;
+	if (!bw->zstream) {
+		REFTABLE_CALLOC_ARRAY(bw->zstream, 1);
+		deflateInit(bw->zstream, 9);
+	}
 }
 
 uint8_t block_writer_type(struct block_writer *bw)
@@ -139,39 +143,60 @@  int block_writer_finish(struct block_writer *w)
 	w->next += 2;
 	put_be24(w->buf + 1 + w->header_off, w->next);
 
+	/*
+	 * Log records are stored zlib-compressed. Note that the compression
+	 * also spans over the restart points we have just written.
+	 */
 	if (block_writer_type(w) == BLOCK_TYPE_LOG) {
 		int block_header_skip = 4 + w->header_off;
-		uLongf src_len = w->next - block_header_skip;
-		uLongf dest_cap = src_len * 1.001 + 12;
-		uint8_t *compressed;
-
-		REFTABLE_ALLOC_ARRAY(compressed, dest_cap);
-
-		while (1) {
-			uLongf out_dest_len = dest_cap;
-			int zresult = compress2(compressed, &out_dest_len,
-						w->buf + block_header_skip,
-						src_len, 9);
-			if (zresult == Z_BUF_ERROR && dest_cap < LONG_MAX) {
-				dest_cap *= 2;
-				compressed =
-					reftable_realloc(compressed, dest_cap);
-				if (compressed)
-					continue;
-			}
-
-			if (Z_OK != zresult) {
-				reftable_free(compressed);
-				return REFTABLE_ZLIB_ERROR;
-			}
-
-			memcpy(w->buf + block_header_skip, compressed,
-			       out_dest_len);
-			w->next = out_dest_len + block_header_skip;
+		uLongf src_len = w->next - block_header_skip, compressed_len;
+		unsigned char *compressed;
+		int ret;
+
+		ret = deflateReset(w->zstream);
+		if (ret != Z_OK)
+			return REFTABLE_ZLIB_ERROR;
+
+		/*
+		 * Precompute the upper bound of how many bytes the compressed
+		 * data may end up with. Combined with `Z_FINISH`, `deflate()`
+		 * is guaranteed to return `Z_STREAM_END`.
+		 */
+		compressed_len = deflateBound(w->zstream, src_len);
+		REFTABLE_ALLOC_ARRAY(compressed, compressed_len);
+
+		w->zstream->next_out = compressed;
+		w->zstream->avail_out = compressed_len;
+		w->zstream->next_in = w->buf + block_header_skip;
+		w->zstream->avail_in = src_len;
+
+		/*
+		 * We want to perform all decompression in a single
+		 * step, which is why we can pass Z_FINISH here. Note
+		 * that both `Z_OK` and `Z_BUF_ERROR` indicate that we
+		 * need to retry according to documentation.
+		 *
+		 * If the call fails we retry with a bigger output
+		 * buffer.
+		 */
+		ret = deflate(w->zstream, Z_FINISH);
+		if (ret != Z_STREAM_END) {
 			reftable_free(compressed);
-			break;
+			return REFTABLE_ZLIB_ERROR;
 		}
+
+		/*
+		 * Overwrite the uncompressed data we have already written and
+		 * adjust the `next` pointer to point right after the
+		 * compressed data.
+		 */
+		memcpy(w->buf + block_header_skip, compressed,
+		       w->zstream->total_out);
+		w->next = w->zstream->total_out + block_header_skip;
+
+		reftable_free(compressed);
 	}
+
 	return w->next;
 }
 
@@ -425,6 +450,8 @@  int block_reader_seek(struct block_reader *br, struct block_iter *it,
 
 void block_writer_release(struct block_writer *bw)
 {
+	deflateEnd(bw->zstream);
+	FREE_AND_NULL(bw->zstream);
 	FREE_AND_NULL(bw->restarts);
 	strbuf_release(&bw->last_key);
 	/* the block is not owned. */
diff --git a/reftable/block.h b/reftable/block.h
index 47acc62c0a..1375957fc8 100644
--- a/reftable/block.h
+++ b/reftable/block.h
@@ -18,6 +18,7 @@  license that can be found in the LICENSE file or at
  * allocation overhead.
  */
 struct block_writer {
+	z_stream *zstream;
 	uint8_t *buf;
 	uint32_t block_size;
 
diff --git a/reftable/writer.c b/reftable/writer.c
index d347ec4cc6..51e663bb19 100644
--- a/reftable/writer.c
+++ b/reftable/writer.c
@@ -153,6 +153,10 @@  void reftable_writer_free(struct reftable_writer *w)
 {
 	if (!w)
 		return;
+	if (w->block_writer) {
+		block_writer_release(w->block_writer);
+		w->block_writer = NULL;
+	}
 	reftable_free(w->block);
 	reftable_free(w);
 }