Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@
<artifactId>wildfly-openssl-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public interface Decompressor {
* {@link #needsInput()} should be called in order to determine if more
* input data is required.
*
* @param b Buffer for the compressed data
* @param b Buffer for the uncompressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of uncompressed data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.NativeCodeLoader;

/**
* This class creates snappy compressors/decompressors.
Expand Down Expand Up @@ -56,37 +55,6 @@ public Configuration getConf() {
return conf;
}

/**
* Are the native snappy libraries loaded &amp; initialized?
*/
public static void checkNativeCodeLoaded() {
if (!NativeCodeLoader.buildSupportsSnappy()) {
throw new RuntimeException("native snappy library not available: " +
"this version of libhadoop was built without " +
"snappy support.");
}
if (!NativeCodeLoader.isNativeCodeLoaded()) {
throw new RuntimeException("Failed to load libhadoop.");
}
if (!SnappyCompressor.isNativeCodeLoaded()) {
throw new RuntimeException("native snappy library not available: " +
"SnappyCompressor has not been loaded.");
}
if (!SnappyDecompressor.isNativeCodeLoaded()) {
throw new RuntimeException("native snappy library not available: " +
"SnappyDecompressor has not been loaded.");
}
}

public static boolean isNativeCodeLoaded() {
return SnappyCompressor.isNativeCodeLoaded() &&
SnappyDecompressor.isNativeCodeLoaded();
}

public static String getLibraryName() {
return SnappyCompressor.getLibraryName();
}

/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
Expand Down Expand Up @@ -115,7 +83,6 @@ public CompressionOutputStream createOutputStream(OutputStream out)
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
checkNativeCodeLoaded();
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
Expand All @@ -133,7 +100,6 @@ public CompressionOutputStream createOutputStream(OutputStream out,
*/
@Override
public Class<? extends Compressor> getCompressorType() {
checkNativeCodeLoaded();
return SnappyCompressor.class;
}

Expand All @@ -144,7 +110,6 @@ public Class<? extends Compressor> getCompressorType() {
*/
@Override
public Compressor createCompressor() {
checkNativeCodeLoaded();
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
Expand Down Expand Up @@ -179,7 +144,6 @@ public CompressionInputStream createInputStream(InputStream in)
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
checkNativeCodeLoaded();
return new BlockDecompressorStream(in, decompressor, conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
Expand All @@ -192,7 +156,6 @@ public CompressionInputStream createInputStream(InputStream in,
*/
@Override
public Class<? extends Decompressor> getDecompressorType() {
checkNativeCodeLoaded();
return SnappyDecompressor.class;
}

Expand All @@ -203,7 +166,6 @@ public Class<? extends Decompressor> getDecompressorType() {
*/
@Override
public Decompressor createDecompressor() {
checkNativeCodeLoaded();
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
Expand All @@ -215,7 +177,7 @@ public Decompressor createDecompressor() {
*/
@Override
public DirectDecompressor createDirectDecompressor() {
return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null;
return new SnappyDirectDecompressor();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;

/**
* A {@link Compressor} based on the snappy compression algorithm.
Expand All @@ -48,24 +48,6 @@ public class SnappyCompressor implements Compressor {
private long bytesRead = 0L;
private long bytesWritten = 0L;

private static boolean nativeSnappyLoaded = false;

static {
if (NativeCodeLoader.isNativeCodeLoaded() &&
NativeCodeLoader.buildSupportsSnappy()) {
try {
initIDs();
nativeSnappyLoaded = true;
} catch (Throwable t) {
LOG.error("failed to load SnappyCompressor", t);
}
}
}

public static boolean isNativeCodeLoaded() {
return nativeSnappyLoaded;
}

/**
* Creates a new compressor.
*
Expand Down Expand Up @@ -291,9 +273,14 @@ public long getBytesWritten() {
public void end() {
}

private native static void initIDs();

private native int compressBytesDirect();

public native static String getLibraryName();
private int compressBytesDirect() throws IOException {
if (uncompressedDirectBufLen == 0) {
return 0;
} else {
// Set the position and limit of `uncompressedDirectBuf` for reading
uncompressedDirectBuf.position(0).limit(uncompressedDirectBufLen);
return Snappy.compress((ByteBuffer) uncompressedDirectBuf,
(ByteBuffer) compressedDirectBuf);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DirectDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;

/**
* A {@link Decompressor} based on the snappy compression algorithm.
Expand All @@ -45,24 +45,6 @@ public class SnappyDecompressor implements Decompressor {
private int userBufOff = 0, userBufLen = 0;
private boolean finished;

private static boolean nativeSnappyLoaded = false;

static {
if (NativeCodeLoader.isNativeCodeLoaded() &&
NativeCodeLoader.buildSupportsSnappy()) {
try {
initIDs();
nativeSnappyLoaded = true;
} catch (Throwable t) {
LOG.error("failed to load SnappyDecompressor", t);
}
}
}

public static boolean isNativeCodeLoaded() {
return nativeSnappyLoaded;
}

/**
* Creates a new compressor.
*
Expand Down Expand Up @@ -201,7 +183,7 @@ public boolean finished() {
* {@link #needsInput()} should be called in order to determine if more
* input data is required.
*
* @param b Buffer for the compressed data
* @param b Buffer for the uncompressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
Expand Down Expand Up @@ -276,13 +258,27 @@ public void end() {
// do nothing
}

private native static void initIDs();
private int decompressBytesDirect() throws IOException {
if (compressedDirectBufLen == 0) {
return 0;
} else {
// Set the position and limit of `compressedDirectBuf` for reading
compressedDirectBuf.position(0).limit(compressedDirectBufLen);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to set position and limit here? If compressedDirectBuf already has been set with position and limit before calling decompressBytesDirect? Won't we read wrong data from this buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decompressBytesDirect is the only call that reads the data from compressedDirectBuf, so I think we should read it from the beginning. In compressedDirectBuf, we should fully decompress the compressedDirectBuf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For decompressDirect, compressedDirectBuf is set up correctly before calling decompressBytesDirect. So we don't need to do it here again. For decompress, I'm not sure, but looks like setInputFromSavedData also takes care about compressedDirectBuf reseting. I guess we don't need to do it in decompressBytesDirect.

// There is compressed input, decompress it now.
int size = Snappy.uncompressedLength((ByteBuffer) compressedDirectBuf);
if (size > uncompressedDirectBuf.capacity()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check with uncompressedDirectBuf.remaining instead of capacity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before calling decompressBytesDirect, we always reset uncompressedDirectBuf. So both method returns the same result. But you are right, I think .remaining is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decompressDirect also calls decompressBytesDirect, but the uncompressedDirectBuf is passed in as an argument. I think it has danger to assume uncompressedDirectBuf is always reset.

throw new IOException("Could not decompress data. " +
"uncompressedDirectBuf length is too small.");
}
size = Snappy.uncompress((ByteBuffer) compressedDirectBuf,
(ByteBuffer) uncompressedDirectBuf);
return size;
}
}

private native int decompressBytesDirect();

int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
assert (this instanceof SnappyDirectDecompressor);

ByteBuffer presliced = dst;
if (dst.position() > 0) {
presliced = dst;
Expand Down Expand Up @@ -311,10 +307,10 @@ int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
}
return n;
}

public static class SnappyDirectDecompressor extends SnappyDecompressor implements
DirectDecompressor {

@Override
public boolean finished() {
return (endOfInput && super.finished());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ public static boolean isNativeCodeLoaded() {
return nativeCodeLoaded;
}

/**
* Returns true only if this build was compiled with support for snappy.
*/
public static native boolean buildSupportsSnappy();

/**
* Returns true only if this build was compiled with support for ISA-L.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -67,7 +66,6 @@ public static void main(String[] args) {
Configuration conf = new Configuration();
boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded();
boolean zlibLoaded = false;
boolean snappyLoaded = false;
boolean isalLoaded = false;
boolean zStdLoaded = false;
boolean pmdkLoaded = false;
Expand All @@ -80,7 +78,6 @@ public static void main(String[] args) {
String openSslDetail = "";
String hadoopLibraryName = "";
String zlibLibraryName = "";
String snappyLibraryName = "";
String isalDetail = "";
String pmdkDetail = "";
String zstdLibraryName = "";
Expand All @@ -99,11 +96,6 @@ public static void main(String[] args) {
if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) {
zstdLibraryName = ZStandardCodec.getLibraryName();
}
snappyLoaded = NativeCodeLoader.buildSupportsSnappy() &&
SnappyCodec.isNativeCodeLoaded();
if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) {
snappyLibraryName = SnappyCodec.getLibraryName();
}

isalDetail = ErasureCodeNative.getLoadingFailureReason();
if (isalDetail != null) {
Expand Down Expand Up @@ -152,7 +144,6 @@ public static void main(String[] args) {
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
System.out.printf("snappy: %b %s%n", snappyLoaded, snappyLibraryName);
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
Expand All @@ -164,7 +155,7 @@ public static void main(String[] args) {
}

if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
(checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded
(checkAll && !(zlibLoaded && lz4Loaded
&& bzip2Loaded && isalLoaded && zStdLoaded))) {
// return 1 to indicated check failed
ExitUtil.terminate(1);
Expand Down
Loading