Skip to content

Commit 0a2c831

Browse files
viiryaparthchandra
authored andcommitted
Call decompressor ByteBuffer API and make ByteBufferBytesInput public (apache#15)
* Call decompressor ByteBuffer API * fix * Remove unused import * Make ByteBufferBytesInput public * For review Co-authored-by: Liang-Chi Hsieh <[email protected]>
1 parent 4bc0537 commit 0a2c831

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ public long size() {
515515
}
516516
}
517517

518-
private static class ByteBufferBytesInput extends BytesInput {
518+
public static class ByteBufferBytesInput extends BytesInput {
519519
private final ByteBuffer buffer;
520520

521521
private ByteBufferBytesInput(ByteBuffer buffer) {

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.parquet.hadoop;
2020

2121
import java.io.IOException;
22+
import java.nio.ByteBuffer;
2223
import java.util.ArrayDeque;
2324
import java.util.HashMap;
2425
import java.util.Iterator;
@@ -166,10 +167,33 @@ public DataPage readPage() {
166167
public DataPage visit(DataPageV1 dataPageV1) {
167168
try {
168169
BytesInput bytes = dataPageV1.getBytes();
170+
ByteBuffer byteBuffer = bytes.toByteBuffer();
171+
long compressedSize = bytes.size();
172+
169173
if (null != blockDecryptor) {
170-
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
174+
byte[] decrypted = blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD);
175+
compressedSize = decrypted.length;
176+
byteBuffer = ByteBuffer.allocateDirect((int) compressedSize);
177+
byteBuffer.put(decrypted);
178+
byteBuffer.flip();
179+
}
180+
181+
if (!byteBuffer.isDirect()) {
182+
ByteBuffer directByteBuffer = ByteBuffer.allocateDirect((int) compressedSize);;
183+
directByteBuffer.put(byteBuffer);
184+
directByteBuffer.flip();
185+
byteBuffer = directByteBuffer;
186+
}
187+
188+
// The input/output bytebuffers must be direct for (bytebuffer-based, native) decompressor
189+
ByteBuffer decompressedBuffer = ByteBuffer.allocateDirect(dataPageV1.getUncompressedSize());
190+
decompressor.decompress(byteBuffer, (int) compressedSize, decompressedBuffer, dataPageV1.getUncompressedSize());
191+
192+
// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is not reset.
193+
if (decompressedBuffer.position() != 0) {
194+
decompressedBuffer.flip();
171195
}
172-
BytesInput decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
196+
BytesInput decompressed = BytesInput.from(decompressedBuffer);
173197

174198
final DataPageV1 decompressedPage;
175199
if (offsetIndex == null) {

0 commit comments

Comments
 (0)