diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index f0e7af35c5..1998ea09dc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -37,6 +37,7 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; public class CodecFactory implements CompressionCodecFactory { @@ -109,7 +110,17 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx decompressor.reset(); } InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); + + // We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to avoid + // off-heap memory fragmentation issue, see https://issues.apache.org/jira/browse/PARQUET-2160. + // This change will load the decompressor stream into heap a little earlier, since the problem it solves + // only happens in the ZSTD codec, so this modification is only made for ZSTD streams. + if (codec instanceof ZstandardCodec) { + decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); + is.close(); + } else { + decompressed = BytesInput.from(is, uncompressedSize); + } } else { decompressed = bytes; }