Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@
<artifactId>snappy-java</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -577,11 +582,6 @@
<exclude>src/main/native/m4/*</exclude>
<exclude>src/test/empty-file</exclude>
<exclude>src/test/all-tests</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.h</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.h</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.c</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc_encoder.h</exclude>
<exclude>src/main/native/gtest/**/*</exclude>
<exclude>src/test/resources/test-untar.tgz</exclude>
<exclude>src/test/resources/test.har/_SUCCESS</exclude>
Expand Down
4 changes: 0 additions & 4 deletions hadoop-common-project/hadoop-common/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,6 @@ configure_file(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
hadoop_add_dual_library(hadoop
main/native/src/exception.c
${SRC}/io/compress/lz4/Lz4Compressor.c
${SRC}/io/compress/lz4/Lz4Decompressor.c
${SRC}/io/compress/lz4/lz4.c
${SRC}/io/compress/lz4/lz4hc.c
${ISAL_SOURCE_FILES}
${ZSTD_SOURCE_FILES}
${OPENSSL_SOURCE_FILES}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,12 @@
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.NativeCodeLoader;

/**
* This class creates lz4 compressors/decompressors.
*/
public class Lz4Codec implements Configurable, CompressionCodec {

static {
NativeCodeLoader.isNativeCodeLoaded();
}

Configuration conf;

/**
Expand All @@ -60,19 +55,6 @@ public Configuration getConf() {
return conf;
}

/**
* Are the native lz4 libraries loaded &amp; initialized?
*
* @return true if loaded &amp; initialized, otherwise false
*/
public static boolean isNativeCodeLoaded() {
return NativeCodeLoader.isNativeCodeLoaded();
}

public static String getLibraryName() {
return Lz4Compressor.getLibraryName();
}

/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
Expand Down Expand Up @@ -101,9 +83,6 @@ public CompressionOutputStream createOutputStream(OutputStream out)
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
Expand All @@ -121,10 +100,6 @@ public CompressionOutputStream createOutputStream(OutputStream out,
*/
@Override
public Class<? extends Compressor> getCompressorType() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}

return Lz4Compressor.class;
}

Expand All @@ -135,9 +110,6 @@ public Class<? extends Compressor> getCompressorType() {
*/
@Override
public Compressor createCompressor() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
Expand Down Expand Up @@ -175,10 +147,6 @@ public CompressionInputStream createInputStream(InputStream in)
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}

return new BlockDecompressorStream(in, decompressor, conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
Expand All @@ -191,10 +159,6 @@ public CompressionInputStream createInputStream(InputStream in,
*/
@Override
public Class<? extends Decompressor> getDecompressorType() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}

return Lz4Decompressor.class;
}

Expand All @@ -205,9 +169,6 @@ public Class<? extends Decompressor> getDecompressorType() {
*/
@Override
public Decompressor createDecompressor() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;

import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4Compressor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put into the same import block as org.sjf4j

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already in same block, no?

import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4Compressor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,22 +51,7 @@ public class Lz4Compressor implements Compressor {
private long bytesRead = 0L;
private long bytesWritten = 0L;

private final boolean useLz4HC;

static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize lz4
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
" without native hadoop library!");
}
}
private final LZ4Compressor lz4Compressor;

/**
* Creates a new compressor.
Expand All @@ -74,9 +61,21 @@ public class Lz4Compressor implements Compressor {
* which trades CPU for compression ratio.
*/
public Lz4Compressor(int directBufferSize, boolean useLz4HC) {
this.useLz4HC = useLz4HC;
this.directBufferSize = directBufferSize;

try {
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
if (useLz4HC) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems we no longer need the field useLz4HC with this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes.

lz4Compressor = lz4Factory.highCompressor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The library also allow configuring the compression level, which perhaps we can add a Hadoop option to enable that later. This just use the default compression level.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sounds good. We can add it later if it is necessary.

} else {
lz4Compressor = lz4Factory.fastCompressor();
}
} catch (AssertionError t) {
throw new RuntimeException("lz4-java library is not available: " +
"Lz4Compressor has not been loaded. You need to add " +
"lz4-java.jar to your CLASSPATH. " + t, t);
}

uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);

// Compression is guaranteed to succeed if 'dstCapacity' >=
Expand Down Expand Up @@ -243,7 +242,7 @@ public synchronized int compress(byte[] b, int off, int len)
}

// Compress data
n = useLz4HC ? compressBytesDirectHC() : compressBytesDirect();
n = compressDirectBuf();
compressedDirectBuf.limit(n);
uncompressedDirectBuf.clear(); // lz4 consumes all buffer input

Expand Down Expand Up @@ -309,11 +308,20 @@ public synchronized long getBytesWritten() {
public synchronized void end() {
}

private native static void initIDs();

private native int compressBytesDirect();

private native int compressBytesDirectHC();

public native static String getLibraryName();
private int compressDirectBuf() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems some of the methods in this class look exactly the same as in SnappyCompressor - perhaps we can do some refactoring later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, currently this PR focuses on turning to use lz4-java. We can refactor compressor/decompressor classes later.

if (uncompressedDirectBufLen == 0) {
return 0;
} else {
// Set the position and limit of `uncompressedDirectBuf` for reading
uncompressedDirectBuf.limit(uncompressedDirectBufLen).position(0);
compressedDirectBuf.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't necessary since it's called right before the call site?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the clear and limit calls at the call site.

// Re-initialize the lz4's output direct-buffer
    compressedDirectBuf.clear();
    compressedDirectBuf.limit(0);

But we cannot remove clear in compressDirectBuf method.

First, lz4-java cannot accept the compressedDirectBuf if it is called with limit(0).

[ERROR] Failures:                                                                                                                                                                                                                      
[ERROR]   TestCompressorDecompressor.testCompressorDecompressor:69  Expected to find 'testCompressorDecompressor error !!!' but got unexpected exception: net.jpountz.lz4.LZ4Exception: maxDestLen is too small                        
        at net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:69)                                                                                                                                                         
        at net.jpountz.lz4.LZ4Compressor.compress(LZ4Compressor.java:158)                                                                                                                                                              
        at org.apache.hadoop.io.compress.lz4.Lz4Compressor.compressDirectBuf(Lz4Compressor.java:310)                                                                                                                                   
        at org.apache.hadoop.io.compress.lz4.Lz4Compressor.compress(Lz4Compressor.java:237)                                                                                                                                            
        at org.apache.hadoop.io.compress.CompressDecompressTester$CompressionTestStrategy$2.assertCompression(CompressDecompressTester.java:286)                                                                                       
        at org.apache.hadoop.io.compress.CompressDecompressTester.test(CompressDecompressTester.java:114)                                                                                                                              
        at org.apache.hadoop.io.compress.TestCompressorDecompressor.testCompressorDecompressor(TestCompressorDecompressor.java:66)                                                                                                     

Second, even we remove limit in the call site, one test still failed:

[ERROR] testCompressorDecompressor(org.apache.hadoop.io.compress.TestCompressorDecompressor)  Time elapsed: 0.539 s  <<< FAILURE!
java.lang.AssertionError: org.apache.hadoop.io.compress.lz4.Lz4Compressor_org.apache.hadoop.io.compress.lz4.Lz4Decompressor- empty stream compressed output size != 4 expected:<4> but was:<65796>
        at org.junit.Assert.fail(Assert.java:88)                                                                                                                                                                                       
        at org.junit.Assert.failNotEquals(Assert.java:834)                                               
        at org.junit.Assert.assertEquals(Assert.java:645)                                                                                                                                                                              
        at org.apache.hadoop.io.compress.CompressDecompressTester$CompressionTestStrategy$3.assertCompression(CompressDecompressTester.java:334)
        at org.apache.hadoop.io.compress.CompressDecompressTester.test(CompressDecompressTester.java:114)      
        at org.apache.hadoop.io.compress.TestCompressorDecompressor.testCompressorDecompressor(TestCompressorDecompressor.java:66)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay makes sense.

lz4Compressor.compress((ByteBuffer) uncompressedDirectBuf,
(ByteBuffer) compressedDirectBuf);
uncompressedDirectBufLen = 0;
uncompressedDirectBuf.limit(directBufferSize).position(0);
int size = compressedDirectBuf.position();
compressedDirectBuf.position(0);
return size;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;

import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, not in same import block as org.apache

Copy link
Member Author

@viirya viirya Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already in same block?

import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya I think steveloughran is mentioning about the order of imports and blank line between import blocks.
https://github.com/steveloughran/formality/blob/master/styleguide/styleguide.md#imports

While the original file violates the coding stantdard and we usually avoid large diff just for fixing formatting issue, it would be ok to fix a few lines here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, if things are mixed up, best to leave alone -at least for those files which get lots of changes. For something which rarely sees maintenance, you can make a stronger case for cleanup. I do it sometimes, but as I also get to field cherrypick merge pain, I don't go wild on it


import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,20 +46,7 @@ public class Lz4Decompressor implements Decompressor {
private int userBufOff = 0, userBufLen = 0;
private boolean finished;

static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize lz4
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
" without native hadoop library!");
}
}
private LZ4SafeDecompressor lz4Decompressor;

/**
* Creates a new compressor.
Expand All @@ -67,6 +56,15 @@ public class Lz4Decompressor implements Decompressor {
public Lz4Decompressor(int directBufferSize) {
this.directBufferSize = directBufferSize;

try {
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
lz4Decompressor = lz4Factory.safeDecompressor();
} catch (AssertionError t) {
throw new RuntimeException("lz4-java library is not available: " +
"Lz4Decompressor has not been loaded. You need to add " +
"lz4-java.jar to your CLASSPATH. " + t, t);
}

compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
Expand Down Expand Up @@ -200,7 +198,7 @@ public synchronized boolean finished() {
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
* @return The actual number of bytes of uncompressed data.
* @throws IOException
*/
@Override
Expand Down Expand Up @@ -228,7 +226,7 @@ public synchronized int decompress(byte[] b, int off, int len)
uncompressedDirectBuf.limit(directBufferSize);

// Decompress data
n = decompressBytesDirect();
n = decompressDirectBuf();
uncompressedDirectBuf.limit(n);

if (userBufLen <= 0) {
Expand Down Expand Up @@ -272,7 +270,18 @@ public synchronized void end() {
// do nothing
}

private native static void initIDs();

private native int decompressBytesDirect();
private int decompressDirectBuf() {
if (compressedDirectBufLen == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will ever happen but it's not a big deal.

return 0;
} else {
compressedDirectBuf.limit(compressedDirectBufLen).position(0);
lz4Decompressor.decompress((ByteBuffer) compressedDirectBuf,
(ByteBuffer) uncompressedDirectBuf);
compressedDirectBufLen = 0;
compressedDirectBuf.clear();
int size = uncompressedDirectBuf.position();
uncompressedDirectBuf.position(0);
return size;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -69,8 +68,6 @@ public static void main(String[] args) {
boolean isalLoaded = false;
boolean zStdLoaded = false;
boolean pmdkLoaded = false;
// lz4 is linked within libhadoop
boolean lz4Loaded = nativeHadoopLoaded;
boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
boolean openSslLoaded = false;
boolean winutilsExists = false;
Expand All @@ -81,7 +78,6 @@ public static void main(String[] args) {
String isalDetail = "";
String pmdkDetail = "";
String zstdLibraryName = "";
String lz4LibraryName = "";
String bzip2LibraryName = "";
String winutilsPath = null;

Expand Down Expand Up @@ -119,9 +115,6 @@ public static void main(String[] args) {
openSslLoaded = true;
}

if (lz4Loaded) {
lz4LibraryName = Lz4Codec.getLibraryName();
}
if (bzip2Loaded) {
bzip2LibraryName = Bzip2Factory.getLibraryName(conf);
}
Expand All @@ -144,7 +137,6 @@ public static void main(String[] args) {
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
System.out.printf("ISA-L: %b %s%n", isalLoaded, isalDetail);
Expand All @@ -155,8 +147,8 @@ public static void main(String[] args) {
}

if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
(checkAll && !(zlibLoaded && lz4Loaded
&& bzip2Loaded && isalLoaded && zStdLoaded))) {
(checkAll && !(zlibLoaded && bzip2Loaded
&& isalLoaded && zStdLoaded))) {
// return 1 to indicated check failed
ExitUtil.terminate(1);
}
Expand Down
Loading