diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index b2a8e7f10e..4720c08445 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -32,23 +32,16 @@ * entire input in setInput and compresses it as one compressed block. */ public class SnappyCompressor implements Compressor { - private static final int initialBufferSize = 64 * 1024 * 1024; - // Buffer for compressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); // Buffer for uncompressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); private long bytesRead = 0L; private long bytesWritten = 0L; private boolean finishCalled = false; - public SnappyCompressor() { - inputBuffer.limit(0); - outputBuffer.limit(0); - } - /** * Fills specified buffer with compressed data. Returns actual number * of bytes of compressed data. A return value of 0 indicates that @@ -120,7 +113,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) { @Override public void end() { - // No-op + CleanUtil.clean(inputBuffer); + CleanUtil.clean(outputBuffer); } @Override @@ -157,18 +151,6 @@ public void reinit(Configuration c) { @Override public synchronized void reset() { - if (inputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = inputBuffer; - inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - - if (outputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = outputBuffer; - outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - finishCalled = false; bytesRead = bytesWritten = 0; inputBuffer.rewind(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 8a7f86d5ae..c3da63f9c4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -27,21 +27,14 @@ import org.apache.parquet.Preconditions; public class SnappyDecompressor implements Decompressor { - private static final int initialBufferSize = 64 * 1024 * 1024; - // Buffer for uncompressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); // Buffer for compressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); private boolean finished; - public SnappyDecompressor() { - inputBuffer.limit(0); - outputBuffer.limit(0); - } - /** * Fills specified buffer with uncompressed data. Returns actual number * of bytes of uncompressed data. A return value of 0 indicates that @@ -122,7 +115,8 @@ public synchronized void setInput(byte[] buffer, int off, int len) { @Override public void end() { - // No-op + CleanUtil.clean(inputBuffer); + CleanUtil.clean(outputBuffer); } @Override @@ -142,18 +136,6 @@ public synchronized boolean needsInput() { @Override public synchronized void reset() { - if (inputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = inputBuffer; - inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - - if (outputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = outputBuffer; - outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - finished = false; inputBuffer.rewind(); outputBuffer.rewind();