Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@
<artifactId>snappy-java</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -577,11 +582,6 @@
<exclude>src/main/native/m4/*</exclude>
<exclude>src/test/empty-file</exclude>
<exclude>src/test/all-tests</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.h</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.h</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.c</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc_encoder.h</exclude>
<exclude>src/main/native/gtest/**/*</exclude>
<exclude>src/test/resources/test-untar.tgz</exclude>
<exclude>src/test/resources/test.har/_SUCCESS</exclude>
Expand Down
4 changes: 0 additions & 4 deletions hadoop-common-project/hadoop-common/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,6 @@ configure_file(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
hadoop_add_dual_library(hadoop
main/native/src/exception.c
${SRC}/io/compress/lz4/Lz4Compressor.c
${SRC}/io/compress/lz4/Lz4Decompressor.c
${SRC}/io/compress/lz4/lz4.c
${SRC}/io/compress/lz4/lz4hc.c
${ISAL_SOURCE_FILES}
${ZSTD_SOURCE_FILES}
${OPENSSL_SOURCE_FILES}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,12 @@
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.NativeCodeLoader;

/**
* This class creates lz4 compressors/decompressors.
*/
public class Lz4Codec implements Configurable, CompressionCodec {

static {
NativeCodeLoader.isNativeCodeLoaded();
}

Configuration conf;

/**
Expand All @@ -60,19 +55,6 @@ public Configuration getConf() {
return conf;
}

/**
* Are the native lz4 libraries loaded &amp; initialized?
*
* @return true if loaded &amp; initialized, otherwise false
*/
public static boolean isNativeCodeLoaded() {
return NativeCodeLoader.isNativeCodeLoaded();
}

public static String getLibraryName() {
return Lz4Compressor.getLibraryName();
}

/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
Expand Down Expand Up @@ -101,9 +83,6 @@ public CompressionOutputStream createOutputStream(OutputStream out)
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
Expand All @@ -121,10 +100,6 @@ public CompressionOutputStream createOutputStream(OutputStream out,
*/
@Override
public Class<? extends Compressor> getCompressorType() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}

return Lz4Compressor.class;
}

Expand All @@ -135,9 +110,6 @@ public Class<? extends Compressor> getCompressorType() {
*/
@Override
public Compressor createCompressor() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
Expand Down Expand Up @@ -175,10 +147,6 @@ public CompressionInputStream createInputStream(InputStream in)
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}

return new BlockDecompressorStream(in, decompressor, conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
Expand All @@ -191,10 +159,6 @@ public CompressionInputStream createInputStream(InputStream in,
*/
@Override
public Class<? extends Decompressor> getDecompressorType() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}

return Lz4Decompressor.class;
}

Expand All @@ -205,9 +169,6 @@ public Class<? extends Decompressor> getDecompressorType() {
*/
@Override
public Decompressor createDecompressor() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;

import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4Compressor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,22 +50,7 @@ public class Lz4Compressor implements Compressor {
private long bytesRead = 0L;
private long bytesWritten = 0L;

private final boolean useLz4HC;

static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize lz4
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
" without native hadoop library!");
}
}
private final LZ4Compressor lz4Compressor;

/**
* Creates a new compressor.
Expand All @@ -73,9 +60,21 @@ public class Lz4Compressor implements Compressor {
* which trades CPU for compression ratio.
*/
public Lz4Compressor(int directBufferSize, boolean useLz4HC) {
this.useLz4HC = useLz4HC;
this.directBufferSize = directBufferSize;

try {
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
if (useLz4HC) {
lz4Compressor = lz4Factory.highCompressor();
} else {
lz4Compressor = lz4Factory.fastCompressor();
}
} catch (AssertionError t) {
throw new RuntimeException("lz4-java library is not available: " +
"Lz4Compressor has not been loaded. You need to add " +
"lz4-java.jar to your CLASSPATH. " + t, t);
}

uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf.position(directBufferSize);
Expand Down Expand Up @@ -236,7 +235,7 @@ public synchronized int compress(byte[] b, int off, int len)
}

// Compress data
n = useLz4HC ? compressBytesDirectHC() : compressBytesDirect();
n = compressDirectBuf();
compressedDirectBuf.limit(n);
uncompressedDirectBuf.clear(); // lz4 consumes all buffer input

Expand Down Expand Up @@ -302,11 +301,20 @@ public synchronized long getBytesWritten() {
public synchronized void end() {
}

private native static void initIDs();

private native int compressBytesDirect();

private native int compressBytesDirectHC();

public native static String getLibraryName();
private int compressDirectBuf() {
if (uncompressedDirectBufLen == 0) {
return 0;
} else {
// Set the position and limit of `uncompressedDirectBuf` for reading
uncompressedDirectBuf.limit(uncompressedDirectBufLen).position(0);
compressedDirectBuf.clear();
lz4Compressor.compress((ByteBuffer) uncompressedDirectBuf,
(ByteBuffer) compressedDirectBuf);
uncompressedDirectBufLen = 0;
uncompressedDirectBuf.limit(directBufferSize).position(0);
int size = compressedDirectBuf.position();
compressedDirectBuf.position(0);
return size;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;

import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;

import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,20 +46,7 @@ public class Lz4Decompressor implements Decompressor {
private int userBufOff = 0, userBufLen = 0;
private boolean finished;

static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize lz4
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
" without native hadoop library!");
}
}
private LZ4SafeDecompressor lz4Decompressor;

/**
* Creates a new compressor.
Expand All @@ -67,6 +56,15 @@ public class Lz4Decompressor implements Decompressor {
public Lz4Decompressor(int directBufferSize) {
this.directBufferSize = directBufferSize;

try {
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
lz4Decompressor = lz4Factory.safeDecompressor();
} catch (AssertionError t) {
throw new RuntimeException("lz4-java library is not available: " +
"Lz4Decompressor has not been loaded. You need to add " +
"lz4-java.jar to your CLASSPATH. " + t, t);
}

compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
Expand Down Expand Up @@ -200,7 +198,7 @@ public synchronized boolean finished() {
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
* @return The actual number of bytes of uncompressed data.
* @throws IOException
*/
@Override
Expand Down Expand Up @@ -228,7 +226,7 @@ public synchronized int decompress(byte[] b, int off, int len)
uncompressedDirectBuf.limit(directBufferSize);

// Decompress data
n = decompressBytesDirect();
n = decompressDirectBuf();
uncompressedDirectBuf.limit(n);

if (userBufLen <= 0) {
Expand Down Expand Up @@ -272,7 +270,18 @@ public synchronized void end() {
// do nothing
}

private native static void initIDs();

private native int decompressBytesDirect();
private int decompressDirectBuf() {
if (compressedDirectBufLen == 0) {
return 0;
} else {
compressedDirectBuf.limit(compressedDirectBufLen).position(0);
lz4Decompressor.decompress((ByteBuffer) compressedDirectBuf,
(ByteBuffer) uncompressedDirectBuf);
compressedDirectBufLen = 0;
compressedDirectBuf.clear();
int size = uncompressedDirectBuf.position();
uncompressedDirectBuf.position(0);
return size;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -69,8 +68,6 @@ public static void main(String[] args) {
boolean isalLoaded = false;
boolean zStdLoaded = false;
boolean pmdkLoaded = false;
// lz4 is linked within libhadoop
boolean lz4Loaded = nativeHadoopLoaded;
boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
boolean openSslLoaded = false;
boolean winutilsExists = false;
Expand All @@ -81,7 +78,6 @@ public static void main(String[] args) {
String isalDetail = "";
String pmdkDetail = "";
String zstdLibraryName = "";
String lz4LibraryName = "";
String bzip2LibraryName = "";
String winutilsPath = null;

Expand Down Expand Up @@ -119,9 +115,6 @@ public static void main(String[] args) {
openSslLoaded = true;
}

if (lz4Loaded) {
lz4LibraryName = Lz4Codec.getLibraryName();
}
if (bzip2Loaded) {
bzip2LibraryName = Bzip2Factory.getLibraryName(conf);
}
Expand All @@ -144,7 +137,6 @@ public static void main(String[] args) {
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
System.out.printf("ISA-L: %b %s%n", isalLoaded, isalDetail);
Expand All @@ -155,8 +147,8 @@ public static void main(String[] args) {
}

if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
(checkAll && !(zlibLoaded && lz4Loaded
&& bzip2Loaded && isalLoaded && zStdLoaded))) {
(checkAll && !(zlibLoaded && bzip2Loaded
&& isalLoaded && zStdLoaded))) {
// return 1 to indicated check failed
ExitUtil.terminate(1);
}
Expand Down
Loading