diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java index 47b9d158c509..bfcece6259a6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -18,88 +18,75 @@ */ package org.apache.iceberg.parquet; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.parquet.bytes.BytesInput; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** * This class implements a codec factory that is used when reading from Parquet. It adds a - * workaround for memory issues encountered when reading from zstd-compressed files. This is no - * longer used, as Parquet 1.13 includes this fix. - * - * @deprecated will be removed in 1.5.0 + * workaround to cache codecs by name and level, not just by name. This can be removed when this + * change is made to Parquet. */ -@Deprecated public class ParquetCodecFactory extends CodecFactory { public ParquetCodecFactory(Configuration configuration, int pageSize) { super(configuration, pageSize); } - /** Copied and modified from CodecFactory.HeapBytesDecompressor */ - class HeapBytesDecompressor extends BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - HeapBytesDecompressor(CompressionCodecName codecName) { - this.codec = getCodec(codecName); - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - if (decompressor != null) { - decompressor.reset(); - } - if (codec instanceof ZstandardCodec) { - // we need to close the zstd input stream ASAP to free up native resources, so - // read everything into a buffer and then close it - try (InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor)) { - decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); - } - } else { - InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); - } - } else { - decompressed = bytes; - } - return decompressed; + /** + * This is copied from {@link CodecFactory} and modified to include the level in the cache key. + */ + @Override + protected CompressionCodec getCodec(CompressionCodecName codecName) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; } - - @Override - public void decompress( - ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) - throws IOException { - ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); - output.put(decompressed); + String cacheKey = cacheKey(codecName); + CompressionCodec codec = CODEC_BY_NAME.get(cacheKey); + if (codec != null) { + return codec; } - @Override - public void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); + try { + Class codecClass; + try { + codecClass = Class.forName(codecClassName); + } catch (ClassNotFoundException e) { + // Try to load the class using the job classloader + codecClass = configuration.getClassLoader().loadClass(codecClassName); } + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); + CODEC_BY_NAME.put(cacheKey, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); } } - @Override - protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { - return new HeapBytesDecompressor(codecName); + private String cacheKey(CompressionCodecName codecName) { + String level = null; + switch (codecName) { + case GZIP: + level = configuration.get("zlib.compress.level"); + break; + case BROTLI: + level = configuration.get("compression.brotli.quality"); + break; + case ZSTD: + level = configuration.get("parquet.compression.codec.zstd.level"); + if (level == null) { + // keep "io.compression.codec.zstd.level" for backwards compatibility + level = configuration.get("io.compression.codec.zstd.level"); + } + break; + default: + // compression level is not supported; ignore it + } + String codecClass = codecName.getHadoopCompressionCodecClassName(); + return level == null ? codecClass : codecClass + ":" + level; } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 577004993711..099cffc33bb8 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -86,7 +86,8 @@ class ParquetWriter implements FileAppender, Closeable { this.targetRowGroupSize = rowGroupSize; this.props = properties; this.metadata = ImmutableMap.copyOf(metadata); - this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); + this.compressor = + new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); this.parquetSchema = ParquetSchemaUtil.convert(schema, "table"); this.model = (ParquetValueWriter) createWriterFunc.apply(parquetSchema); this.metricsConfig = metricsConfig;