Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-817, ORC-1088: Support ZStandard compression using zstd-jni #1743

Closed
wants to merge 28 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/core/pom.xml
Original file line number Diff line number Diff line change
@@ -51,6 +51,10 @@
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
8 changes: 8 additions & 0 deletions java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
@@ -72,6 +72,14 @@ public enum OrcConf {
"Define the compression strategy to use while writing data.\n" +
"This changes the compression level of higher level compression\n" +
"codec (like ZLIB)."),
COMPRESSION_ZSTD_LEVEL("orc.compression.zstd.level",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fix ORC-1088

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source table: ORC zlib 4408374802439 4TB

zstd-jni

orc.compression.zstd.level=3 (default)
zstd compress size: 3119313447131 2905G

orc.compression.zstd.level=10
zstd compress size: 2621369844393 2441G

aircompressor

zstd compress size: 3138804372295 2923G

"hive.exec.orc.compression.zstd.level", 1,
"Define the compression level to use with ZStandard codec "
+ "while writing data. The valid range is 1~22"),
COMPRESSION_ZSTD_WINDOWLOG("orc.compression.zstd.windowlog",
"hive.exec.orc.compression.zstd.windowlog", 0,
"Set the maximum allowed back-reference distance for "
+ "ZStandard codec, expressed as power of 2."),
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
"hive.exec.orc.block.padding.tolerance", 0.05,
"Define the tolerance for block padding as a decimal fraction of\n" +
32 changes: 32 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
@@ -426,6 +426,27 @@ public static BloomFilterVersion fromString(String s) {
}
}

public static class ZstdCompressOptions {
private int compressionZstdLevel;
private int compressionZstdWindowLog;

public int getCompressionZstdLevel() {
return compressionZstdLevel;
}

public void setCompressionZstdLevel(int compressionZstdLevel) {
this.compressionZstdLevel = compressionZstdLevel;
}

public int getCompressionZstdWindowLog() {
return compressionZstdWindowLog;
}

public void setCompressionZstdWindowLog(int compressionZstdWindowLog) {
this.compressionZstdWindowLog = compressionZstdWindowLog;
}
}

/**
* Options for creating ORC file writers.
*/
@@ -447,6 +468,7 @@ public static class WriterOptions implements Cloneable {
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
private CompressionStrategy compressionStrategy;
private ZstdCompressOptions zstdCompressOptions;
private double paddingTolerance;
private String bloomFilterColumns;
private double bloomFilterFpp;
@@ -493,6 +515,12 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
compressionStrategy = CompressionStrategy.valueOf(compString);

zstdCompressOptions = new ZstdCompressOptions();
zstdCompressOptions.setCompressionZstdLevel(
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf));
zstdCompressOptions.setCompressionZstdWindowLog(
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf));

paddingTolerance =
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);

@@ -938,6 +966,10 @@ public EncodingStrategy getEncodingStrategy() {
return encodingStrategy;
}

public ZstdCompressOptions getZstdCompressOptions() {
return zstdCompressOptions;
}

public double getPaddingTolerance() {
return paddingTolerance;
}
12 changes: 11 additions & 1 deletion java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
Original file line number Diff line number Diff line change
@@ -115,8 +115,18 @@ public PhysicalFsWriter(FSDataOutputStream outputStream,
}
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
if (codec != null){
compress.withCodec(codec, codec.getDefaultOptions());
CompressionCodec.Options tempOptions = codec.getDefaultOptions();
if (codec instanceof ZstdCodec &&
codec.getDefaultOptions() instanceof ZstdCodec.ZstdOptions options) {
OrcFile.ZstdCompressOptions zstdCompressOptions = opts.getZstdCompressOptions();
if (zstdCompressOptions != null) {
options.setLevel(zstdCompressOptions.getCompressionZstdLevel());
options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog());
}
}
compress.withCodec(codec, tempOptions);
}

this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
24 changes: 22 additions & 2 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

package org.apache.orc.impl;

import com.github.luben.zstd.util.Native;
import com.google.protobuf.ByteString;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
@@ -273,6 +274,17 @@ private static int getClosestBufferSize(int size) {
return Math.min(kb256, Math.max(kb4, pow2));
}

static {
try {
if (!"java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
Native.load();
}
} catch (UnsatisfiedLinkError | ExceptionInInitializerError e) {
LOG.warn("Unable to load zstd-jni library for your platform. " +
"Using builtin-java classes where applicable");
}
}

public static CompressionCodec createCodec(CompressionKind kind) {
switch (kind) {
case NONE:
@@ -288,8 +300,16 @@ public static CompressionCodec createCodec(CompressionKind kind) {
return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
case ZSTD:
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
}
if (Native.isLoaded()) {
return new ZstdCodec();
} else {
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
}
case BROTLI:
return new BrotliCodec();
default:
Loading