Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class CodecFactory implements CompressionCodecFactory {
Expand Down Expand Up @@ -109,7 +110,17 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx
decompressor.reset();
}
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
decompressed = BytesInput.from(is, uncompressedSize);

// We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to avoid
// off-heap memory fragmentation issue, see https://issues.apache.org/jira/browse/PARQUET-2160.
// This change will load the decompressor stream into heap a little earlier, since the problem it solves
// only happens in the ZSTD codec, so this modification is only made for ZSTD streams.
if (codec instanceof ZstandardCodec) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a little weird, but considering doing so will load the decompressor stream into heap in advance, and only zstd has this problem currently, so I made this modification only for zstd stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can consider closing the decompressed stream after it has been read:
https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java#L283-L288
But I'm not sure if there is a situation where the decompressed stream is read more than once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks OK to me, we probably should add some comments explaining why ZSTD deserves the special treatment here.

The change on BytesInput looks more intrusive since it is used not only for decompression but other places like compression. For instance, BytesInput.copy calls toByteArray underneath, and after the call the original object should still be valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
Copy link
Contributor

@shangxinli shangxinli Aug 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we had the discussion in the Jira that ByteInput.copy() just loads into a heap in advance but not add extra overall. Can we have a benchmark on the heap/GC(Heap size, GC time etc). I just want to make sure we fix one problem while introducing another problem.

Other than that, the ZSTD is treated especially might be OK since we had pretty decent coments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a benchmark on the heap/GC(Heap size, GC time etc).

Sure, will do a benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @shangxinli , very sorry about the big delay, I was a little busy last week. The benchmark result and detailed data has been posted in the PR describe block, also cc @sunchao.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on it!

is.close();
} else {
decompressed = BytesInput.from(is, uncompressedSize);
}
} else {
decompressed = bytes;
}
Expand Down