Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions parquet-hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ ParquetInputFormat to materialize records. It should be a the descendant class o

## Class: ZstandardCodec

**Property:** `parquet.compression.codec.zstd.bufferPool.enabled`
**Description:** If it is true, [RecyclingBufferPool](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/RecyclingBufferPool.java) is used.
**Default value:** `false`

---

**Property:** `parquet.compression.codec.zstd.level`
**Description:** The compression level of ZSTD. The valid range is 1~22. Generally the higher compression level, the higher compression ratio can be achieved, but the writing time will be longer.
**Default value:** `3`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.parquet.hadoop.codec;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.NoPool;
import com.github.luben.zstd.RecyclingBufferPool;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand All @@ -43,6 +46,8 @@
*/
public class ZstandardCodec implements Configurable, CompressionCodec {

public final static String PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED = "parquet.compression.codec.zstd.bufferPool.enabled";
public final static boolean DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED = false;
public final static String PARQUET_COMPRESS_ZSTD_LEVEL = "parquet.compression.codec.zstd.level";
public final static int DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL = 3;
public final static String PARQUET_COMPRESS_ZSTD_WORKERS = "parquet.compression.codec.zstd.workers";
Expand Down Expand Up @@ -80,7 +85,13 @@ public CompressionInputStream createInputStream(InputStream stream, Decompressor

@Override
public CompressionInputStream createInputStream(InputStream stream) throws IOException {
return new ZstdDecompressorStream(stream);
BufferPool pool;
if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) {
pool = RecyclingBufferPool.INSTANCE;
} else {
pool = NoPool.INSTANCE;
}
return new ZstdDecompressorStream(stream, pool);
}

@Override
Expand All @@ -91,7 +102,14 @@ public CompressionOutputStream createOutputStream(OutputStream stream, Compresso

@Override
public CompressionOutputStream createOutputStream(OutputStream stream) throws IOException {
return new ZstdCompressorStream(stream, conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
BufferPool pool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Dongjoon for working on this!

It is kind of late. Just a minor comment: if you can wrap the code into a method and call it in both CompressionInputStream() and CompressionOutputStream, it would avoid duplicating. Not a big deal though.

if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) {
pool = RecyclingBufferPool.INSTANCE;
} else {
pool = NoPool.INSTANCE;
}
return new ZstdCompressorStream(stream, pool,
conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
conf.getInt(PARQUET_COMPRESS_ZSTD_WORKERS, DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.codec;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.ZstdOutputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;

Expand All @@ -34,6 +35,13 @@ public ZstdCompressorStream(OutputStream stream, int level, int workers) throws
zstdOutputStream.setWorkers(workers);
}

public ZstdCompressorStream(OutputStream stream, BufferPool pool, int level, int workers) throws IOException {
super(stream);
zstdOutputStream = new ZstdOutputStream(stream, pool);
zstdOutputStream.setLevel(level);
zstdOutputStream.setWorkers(workers);
}

public void write(byte[] b, int off, int len) throws IOException {
zstdOutputStream.write(b, off, len);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.codec;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.ZstdInputStream;
import org.apache.hadoop.io.compress.CompressionInputStream;

Expand All @@ -33,6 +34,11 @@ public ZstdDecompressorStream(InputStream stream) throws IOException {
zstdInputStream = new ZstdInputStream(stream);
}

public ZstdDecompressorStream(InputStream stream, BufferPool pool) throws IOException {
super(stream);
zstdInputStream = new ZstdInputStream(stream, pool);
}

public int read(byte[] b, int off, int len) throws IOException {
return zstdInputStream.read(b, off, len);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.parquet.hadoop;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.NoPool;
import com.github.luben.zstd.RecyclingBufferPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -56,14 +59,18 @@ public class TestZstandardCodec {
public void testZstdCodec() throws IOException {
ZstandardCodec codec = new ZstandardCodec();
Configuration conf = new Configuration();
boolean[] pools = {false, true};
int[] levels = {1, 4, 7, 10, 13, 16, 19, 22};
int[] dataSizes = {0, 1, 10, 1024, 1024 * 1024};

for (int i = 0; i < levels.length; i++) {
conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]);
codec.setConf(conf);
for (int j = 0; j < dataSizes.length; j++) {
testZstd(codec, dataSizes[j]);
for (boolean pool: pools) {
for (int i = 0; i < levels.length; i++) {
conf.setBoolean(ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, pool);
conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]);
codec.setConf(conf);
for (int j = 0; j < dataSizes.length; j++) {
testZstd(codec, dataSizes[j]);
}
}
}
}
Expand Down