diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md index 2ad1e094a1..e40c93c047 100644 --- a/parquet-hadoop/README.md +++ b/parquet-hadoop/README.md @@ -338,6 +338,12 @@ ParquetInputFormat to materialize records. It should be a the descendant class o ## Class: ZstandardCodec +**Property:** `parquet.compression.codec.zstd.bufferPool.enabled` +**Description:** If it is true, [RecyclingBufferPool](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/RecyclingBufferPool.java) is used. +**Default value:** `false` + +--- + **Property:** `parquet.compression.codec.zstd.level` **Description:** The compression level of ZSTD. The valid range is 1~22. Generally the higher compression level, the higher compression ratio can be achieved, but the writing time will be longer. **Default value:** `3` diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java index 0409cf2a17..73908da91e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstandardCodec.java @@ -18,6 +18,9 @@ */ package org.apache.parquet.hadoop.codec; +import com.github.luben.zstd.BufferPool; +import com.github.luben.zstd.NoPool; +import com.github.luben.zstd.RecyclingBufferPool; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; @@ -43,6 +46,8 @@ */ public class ZstandardCodec implements Configurable, CompressionCodec { + public final static String PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED = "parquet.compression.codec.zstd.bufferPool.enabled"; + public final static boolean DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED = false; public final static String PARQUET_COMPRESS_ZSTD_LEVEL = "parquet.compression.codec.zstd.level"; public final static int DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL = 3; public final static String PARQUET_COMPRESS_ZSTD_WORKERS = "parquet.compression.codec.zstd.workers"; @@ -80,7 +85,13 @@ public CompressionInputStream createInputStream(InputStream stream, Decompressor @Override public CompressionInputStream createInputStream(InputStream stream) throws IOException { - return new ZstdDecompressorStream(stream); + BufferPool pool; + if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) { + pool = RecyclingBufferPool.INSTANCE; + } else { + pool = NoPool.INSTANCE; + } + return new ZstdDecompressorStream(stream, pool); } @Override @@ -91,7 +102,14 @@ public CompressionOutputStream createOutputStream(OutputStream stream, Compresso @Override public CompressionOutputStream createOutputStream(OutputStream stream) throws IOException { - return new ZstdCompressorStream(stream, conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), + BufferPool pool; + if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) { + pool = RecyclingBufferPool.INSTANCE; + } else { + pool = NoPool.INSTANCE; + } + return new ZstdCompressorStream(stream, pool, + conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), conf.getInt(PARQUET_COMPRESS_ZSTD_WORKERS, DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java index f9b9210430..bfea804095 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop.codec; +import com.github.luben.zstd.BufferPool; import com.github.luben.zstd.ZstdOutputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -34,6 +35,13 @@ public ZstdCompressorStream(OutputStream stream, int level, int workers) throws zstdOutputStream.setWorkers(workers); } + public ZstdCompressorStream(OutputStream stream, BufferPool pool, int level, int workers) throws IOException { + super(stream); + zstdOutputStream = new ZstdOutputStream(stream, pool); + zstdOutputStream.setLevel(level); + zstdOutputStream.setWorkers(workers); + } + public void write(byte[] b, int off, int len) throws IOException { zstdOutputStream.write(b, off, len); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java index 34d4849ef1..a505e7bde0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop.codec; +import com.github.luben.zstd.BufferPool; import com.github.luben.zstd.ZstdInputStream; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -33,6 +34,11 @@ public ZstdDecompressorStream(InputStream stream) throws IOException { zstdInputStream = new ZstdInputStream(stream); } + public ZstdDecompressorStream(InputStream stream, BufferPool pool) throws IOException { + super(stream); + zstdInputStream = new ZstdInputStream(stream, pool); + } + public int read(byte[] b, int off, int len) throws IOException { return zstdInputStream.read(b, off, len); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java index 289a6aaf0b..c0d98266f7 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java @@ -18,6 +18,9 @@ */ package org.apache.parquet.hadoop; +import com.github.luben.zstd.BufferPool; +import com.github.luben.zstd.NoPool; +import com.github.luben.zstd.RecyclingBufferPool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -56,14 +59,18 @@ public class TestZstandardCodec { public void testZstdCodec() throws IOException { ZstandardCodec codec = new ZstandardCodec(); Configuration conf = new Configuration(); + boolean[] pools = {false, true}; int[] levels = {1, 4, 7, 10, 13, 16, 19, 22}; int[] dataSizes = {0, 1, 10, 1024, 1024 * 1024}; - for (int i = 0; i < levels.length; i++) { - conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]); - codec.setConf(conf); - for (int j = 0; j < dataSizes.length; j++) { - testZstd(codec, dataSizes[j]); + for (boolean pool: pools) { + for (int i = 0; i < levels.length; i++) { + conf.setBoolean(ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, pool); + conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]); + codec.setConf(conf); + for (int j = 0; j < dataSizes.length; j++) { + testZstd(codec, dataSizes[j]); + } } } }