diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java index 931b4b157e..8b47977ef6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java @@ -24,12 +24,11 @@ import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; import static org.apache.parquet.column.ValuesType.VALUES; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.VersionParser.ParsedVersion; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; @@ -549,7 +548,7 @@ public Void visit(DataPageV2 dataPageV2) { }); } - private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) { + private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) { ValuesReader previousReader = this.dataColumn; this.currentEncoding = dataEncoding; @@ -565,13 +564,15 @@ private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, } else { this.dataColumn = dataEncoding.getValuesReader(path, VALUES); } + if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) { bindToDictionary(dictionary); } else { bind(path.getType()); } + try { - dataColumn.initFromPage(pageValueCount, bytes, offset); + dataColumn.initFromPage(pageValueCount, in); } catch (IOException e) { throw new ParquetDecodingException("could not read page in col " + path, e); } @@ -589,16 +590,15 @@ private void readPageV1(DataPageV1 page) { this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); try { - ByteBuffer bytes = page.getBytes().toByteBuffer(); - LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount); + BytesInput bytes = page.getBytes(); + LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount); LOG.debug("reading repetition levels at 0"); - rlReader.initFromPage(pageValueCount, bytes, 0); - int next = rlReader.getNextOffset(); - LOG.debug("reading definition levels at {}", next); - dlReader.initFromPage(pageValueCount, bytes, next); - next = dlReader.getNextOffset(); - LOG.debug("reading data at {}", next); - initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); + ByteBufferInputStream in = bytes.toInputStream(); + rlReader.initFromPage(pageValueCount, in); + LOG.debug("reading definition levels at {}", in.position()); + dlReader.initFromPage(pageValueCount, in); + LOG.debug("reading data at {}", in.position()); + initDataReader(page.getValueEncoding(), in, page.getValueCount()); } catch (IOException e) { throw new ParquetDecodingException("could not read page " + page + " in col " + path, e); } @@ -607,9 +607,9 @@ private void readPageV1(DataPageV1 page) { private void readPageV2(DataPageV2 page) { this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels()); this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels()); + LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount); try { - LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount); - initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount()); + initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount()); } catch (IOException e) { throw new ParquetDecodingException("could not read page " + page + " in col " + path, e); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java index 03aa2f8128..b2ec2a5d1a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java @@ -20,8 +20,7 @@ import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.io.api.Binary; /** @@ -40,8 +39,9 @@ public abstract class ValuesReader { /** * Called to initialize the column reader from a part of a page. * - * The underlying implementation knows how much data to read, so a length - * is not provided. + * Implementations must consume all bytes from the input stream, leaving the + * stream ready to read the next section of data. The underlying + * implementation knows how much data to read, so a length is not provided. * * Each page may contain several sections: * * - * This function is called with 'offset' pointing to the beginning of one of these sections, - * and should return the offset to the section following it. - * * @param valueCount count of values in this page - * @param page the array to read from containing the page data (repetition levels, definition levels, data) - * @param offset where to start reading from in the page + * @param in an input stream containing the page data at the correct offset * * @throws IOException */ - public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException; - - /** - * Same functionality as method of the same name that takes a ByteBuffer instead of a byte[]. - * - * This method is only provided for backward compatibility and will be removed in a future release. - * Please update any code using it as soon as possible. - * @see #initFromPage(int, ByteBuffer, int) - */ - @Deprecated - public void initFromPage(int valueCount, byte[] page, int offset) throws IOException { - this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); - } - - /** - * Called to return offset of the next section - * @return offset of the next section - */ - public int getNextOffset() { - throw new ParquetDecodingException("Unsupported: cannot get offset of the next section."); - } + public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException; /** * usable when the encoding is dictionary based diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java index a5608cbef2..bcc828bf44 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java @@ -22,7 +22,6 @@ import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -44,7 +43,6 @@ public class BitPackingValuesReader extends ValuesReader { private ByteBufferInputStream in; private BitPackingReader bitPackingReader; private final int bitsPerValue; - private int nextOffset; /** * @param bound the maximum value stored by this column @@ -68,21 +66,16 @@ public int readInteger() { /** * {@inheritDoc} - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int) + * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream) */ @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { int effectiveBitLength = valueCount * bitsPerValue; int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue); - this.in = new ByteBufferInputStream(in, offset, length); - this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount); - this.nextOffset = offset + length; - } - @Override - public int getNextOffset() { - return nextOffset; + this.in = stream.sliceStream(length); + this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount); } @Override diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java index 7c19340c8d..0445d25de6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java @@ -19,11 +19,12 @@ package org.apache.parquet.column.values.bitpacking; import java.io.IOException; -import java.util.Arrays; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +37,7 @@ public class ByteBitPackingValuesReader extends ValuesReader { private final BytePacker packer; private final int[] decoded = new int[VALUES_AT_A_TIME]; private int decodedPosition = VALUES_AT_A_TIME - 1; - private ByteBuffer encoded; - private int encodedPos; - private int nextOffset; + private ByteBufferInputStream in; public ByteBitPackingValuesReader(int bound, Packer packer) { this.bitWidth = BytesUtils.getWidthFromMaxInt(bound); @@ -49,37 +48,38 @@ public ByteBitPackingValuesReader(int bound, Packer packer) { public int readInteger() { ++ decodedPosition; if (decodedPosition == decoded.length) { - encoded.position(encodedPos); - if (encodedPos + bitWidth > encoded.limit()) { - // unpack8Values needs at least bitWidth bytes to read from, - // We have to fill in 0 byte at the end of encoded bytes. - byte[] tempEncode = new byte[bitWidth]; - encoded.get(tempEncode, 0, encoded.limit() - encodedPos); - packer.unpack8Values(tempEncode, 0, decoded, 0); - } else { - packer.unpack8Values(encoded, encodedPos, decoded, 0); + try { + if (in.available() < bitWidth) { + // unpack8Values needs at least bitWidth bytes to read from, + // We have to fill in 0 byte at the end of encoded bytes. + byte[] tempEncode = new byte[bitWidth]; + in.read(tempEncode, 0, in.available()); + packer.unpack8Values(tempEncode, 0, decoded, 0); + } else { + ByteBuffer encoded = in.slice(bitWidth); + packer.unpack8Values(encoded, encoded.position(), decoded, 0); + } + } catch (IOException e) { + throw new ParquetDecodingException("Failed to read packed values", e); } - encodedPos += bitWidth; decodedPosition = 0; } return decoded[decodedPosition]; } @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { int effectiveBitLength = valueCount * bitWidth; int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil - LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth); - this.encoded = page; - this.encodedPos = offset; + LOG.debug("reading {} bytes for {} values of size {} bits.", + length, valueCount, bitWidth); + // work-around for null values. this will not happen for repetition or + // definition levels (never null), but will happen when valueCount has not + // been adjusted for null values in the data. + length = Math.min(length, stream.available()); + this.in = stream.sliceStream(length); this.decodedPosition = VALUES_AT_A_TIME - 1; - this.nextOffset = offset + length; - } - - @Override - public int getNextOffset() { - return nextOffset; } @Override diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java index a3355d2acc..bf53846102 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java @@ -18,7 +18,6 @@ */ package org.apache.parquet.column.values.delta; -import java.io.ByteArrayInputStream; import java.io.IOException; import org.apache.parquet.bytes.ByteBufferInputStream; @@ -28,7 +27,6 @@ import org.apache.parquet.column.values.bitpacking.Packer; import org.apache.parquet.io.ParquetDecodingException; -import java.io.IOException; import java.nio.ByteBuffer; /** @@ -43,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { */ private int valuesRead; private long minDeltaInCurrentBlock; - private ByteBuffer page; + /** * stores the decoded values including the first value which is written to the header */ @@ -54,23 +52,16 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { */ private int valuesBuffered; private ByteBufferInputStream in; - private int nextOffset; private DeltaBinaryPackingConfig config; private int[] bitWidths; /** - * eagerly load all the data into memory - * - * @param valueCount count of values in this page - * @param page the array to read from containing the page data (repetition levels, definition levels, data) - * @param offset where to start reading from in the page - * @throws IOException + * eagerly loads all the data into memory */ @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException { - in = new ByteBufferInputStream(page, offset, page.limit() - offset); + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + this.in = stream; this.config = DeltaBinaryPackingConfig.readConfig(in); - this.page = page; this.totalValueCount = BytesUtils.readUnsignedVarInt(in); allocateValuesBuffer(); bitWidths = new int[config.miniBlockNumInABlock]; @@ -81,14 +72,8 @@ public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOE while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis loadNewBlockToBuffer(); } - this.nextOffset = page.limit() - in.available(); } - - @Override - public int getNextOffset() { - return nextOffset; - } - + /** * the value buffer is allocated so that the size of it is multiple of mini block * because when writing, data is flushed on a mini block basis @@ -123,7 +108,7 @@ private void checkRead() { } } - private void loadNewBlockToBuffer() { + private void loadNewBlockToBuffer() throws IOException { try { minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in); } catch (IOException e) { @@ -152,19 +137,18 @@ private void loadNewBlockToBuffer() { * * @param packer the packer created from bitwidth of current mini block */ - private void unpackMiniBlock(BytePackerForLong packer) { + private void unpackMiniBlock(BytePackerForLong packer) throws IOException { for (int j = 0; j < config.miniBlockSizeInValues; j += 8) { unpack8Values(packer); } } - private void unpack8Values(BytePackerForLong packer) { - //calculate the pos because the packer api uses array not stream - int pos = page.limit() - in.available(); - packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered); + private void unpack8Values(BytePackerForLong packer) throws IOException { + // get a single buffer of 8 values. most of the time, this won't require a copy + // TODO: update the packer to consume from an InputStream + ByteBuffer buffer = in.slice(packer.getBitWidth()); + packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered); this.valuesBuffered += 8; - //sync the pos in stream - in.skip(packer.getBitWidth()); } private void readBitWidthsForMiniBlocks() { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java index d810ba8110..e6ee1fd45d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,34 +40,38 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(DeltaLengthByteArrayValuesReader.class); private ValuesReader lengthReader; - private ByteBuffer in; - private int offset; + private ByteBufferInputStream in; public DeltaLengthByteArrayValuesReader() { this.lengthReader = new DeltaBinaryPackingValuesReader(); } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - lengthReader.initFromPage(valueCount, in, offset); - offset = lengthReader.getNextOffset(); - this.in = in; - this.offset = offset; + LOG.debug("init from page at offset {} for length {}", + stream.position(), stream.available()); + lengthReader.initFromPage(valueCount, stream); + this.in = stream.remainingStream(); } @Override public Binary readBytes() { int length = lengthReader.readInteger(); - int start = offset; - offset = start + length; - return Binary.fromConstantByteBuffer(in, start, length); + try { + return Binary.fromConstantByteBuffer(in.slice(length)); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes"); + } } @Override public void skip() { int length = lengthReader.readInteger(); - offset = offset + length; + try { + in.skipFully(length); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to skip " + length + " bytes"); + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java index 742b515dc8..7a016277a4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.values.RequiresPreviousReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; @@ -46,13 +48,12 @@ public DeltaByteArrayReader() { } @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - prefixLengthReader.initFromPage(valueCount, page, offset); - int next = prefixLengthReader.getNextOffset(); - suffixReader.initFromPage(valueCount, page, next); + prefixLengthReader.initFromPage(valueCount, stream); + suffixReader.initFromPage(valueCount, stream); } - + @Override public void skip() { // read the next value to skip so that previous is correct. diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java index 19ff47c239..87edda6b7a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -52,11 +52,12 @@ public DictionaryValuesReader(Dictionary dictionary) { } @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - this.in = new ByteBufferInputStream(page, offset, page.limit() - offset); - if (page.limit() - offset > 0) { - LOG.debug("init from page at offset {} for length {}", offset, (page.limit() - offset)); + this.in = stream.remainingStream(); + if (in.available() > 0) { + LOG.debug("init from page at offset {} for length {}", + stream.position(), stream.available()); int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in); LOG.debug("bit width {}", bitWidth); decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java index 0fa6cc66f2..0b8beb2859 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader; @@ -150,10 +151,10 @@ public static class PlainLongDictionary extends PlainValuesDictionary { */ public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); longDictionaryContent = new long[dictionaryPage.getDictionarySize()]; LongPlainValuesReader longReader = new LongPlainValuesReader(); - longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); + longReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < longDictionaryContent.length; i++) { longDictionaryContent[i] = longReader.readLong(); } @@ -193,10 +194,10 @@ public static class PlainDoubleDictionary extends PlainValuesDictionary { */ public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()]; DoublePlainValuesReader doubleReader = new DoublePlainValuesReader(); - doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0); + doubleReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < doubleDictionaryContent.length; i++) { doubleDictionaryContent[i] = doubleReader.readDouble(); } @@ -236,10 +237,10 @@ public static class PlainIntegerDictionary extends PlainValuesDictionary { */ public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); intDictionaryContent = new int[dictionaryPage.getDictionarySize()]; IntegerPlainValuesReader intReader = new IntegerPlainValuesReader(); - intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0); + intReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < intDictionaryContent.length; i++) { intDictionaryContent[i] = intReader.readInteger(); } @@ -279,10 +280,10 @@ public static class PlainFloatDictionary extends PlainValuesDictionary { */ public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); floatDictionaryContent = new float[dictionaryPage.getDictionarySize()]; FloatPlainValuesReader floatReader = new FloatPlainValuesReader(); - floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); + floatReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < floatDictionaryContent.length; i++) { floatDictionaryContent[i] = floatReader.readFloat(); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java index 82e555134c..64113250e5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java @@ -20,8 +20,8 @@ import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -31,40 +31,37 @@ public class BinaryPlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(BinaryPlainValuesReader.class); - private ByteBuffer in; - private int offset; + private ByteBufferInputStream in; @Override public Binary readBytes() { try { - int length = BytesUtils.readIntLittleEndian(in, offset); - int start = offset + 4; - offset = start + length; - return Binary.fromConstantByteBuffer(in, start, length); + int length = BytesUtils.readIntLittleEndian(in); + return Binary.fromConstantByteBuffer(in.slice(length)); } catch (IOException e) { - throw new ParquetDecodingException("could not read bytes at offset " + offset, e); + throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); } catch (RuntimeException e) { - throw new ParquetDecodingException("could not read bytes at offset " + offset, e); + throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); } } @Override public void skip() { try { - int length = BytesUtils.readIntLittleEndian(in, offset); - offset += 4 + length; + int length = BytesUtils.readIntLittleEndian(in); + in.skipFully(length); } catch (IOException e) { - throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); } catch (RuntimeException e) { - throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); } } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - this.in = in; - this.offset = offset; + LOG.debug("init from page at offset {} for length {}", + stream.position(), (stream.available() - stream.position())); + this.in = stream.remainingStream(); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java index 1f8fc2c35f..3296daadec 100755 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -21,8 +21,8 @@ import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; import org.slf4j.Logger; @@ -60,17 +60,11 @@ public void skip() { /** * {@inheritDoc} - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int valueCount, ByteBuffer page, int offset) + * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream) */ @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - this.in.initFromPage(valueCount, in, offset); + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + this.in.initFromPage(valueCount, stream); } - - @Override - public int getNextOffset() { - return this.in.getNextOffset(); - } - } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java index 7a14f811ec..7738de7a3e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; @@ -33,9 +34,9 @@ */ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(FixedLenByteArrayPlainValuesReader.class); - private ByteBuffer in; - private int offset; - private int length; + + private final int length; + private ByteBufferInputStream in; public FixedLenByteArrayPlainValuesReader(int length) { this.length = length; @@ -44,24 +45,26 @@ public FixedLenByteArrayPlainValuesReader(int length) { @Override public Binary readBytes() { try { - int start = offset; - offset = start + length; - return Binary.fromConstantByteBuffer(in, start, length); - } catch (RuntimeException e) { - throw new ParquetDecodingException("could not read bytes at offset " + offset, e); + return Binary.fromConstantByteBuffer(in.slice(length)); + } catch (IOException | RuntimeException e) { + throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); } } @Override public void skip() { - offset += length; + try { + in.skipFully(length); + } catch (IOException | RuntimeException e) { + throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); + } } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - this.in = in; - this.offset = offset; + LOG.debug("init from page at offset {} for length {}", + stream.position(), stream.available()); + this.in = stream.remainingStream(); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index e79cbb2e10..726f611a69 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,7 +19,6 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.LittleEndianDataInputStream; @@ -39,18 +38,10 @@ abstract public class PlainValuesReader extends ValuesReader { protected LittleEndianDataInputStream in; - /** - * {@inheritDoc} - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int) - */ @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset , (in.limit() - offset)); - this.in = new LittleEndianDataInputStream(toInputStream(in, offset)); - } - - private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) { - return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset); + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + this.in = new LittleEndianDataInputStream(stream.remainingStream()); } public static class DoublePlainValuesReader extends PlainValuesReader { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index 6daa349ec8..d682a98086 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -22,9 +22,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; -import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index 4ccf2b86a3..ebfa76deb0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -19,7 +19,6 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -35,26 +34,16 @@ public class RunLengthBitPackingHybridValuesReader extends ValuesReader { private final int bitWidth; private RunLengthBitPackingHybridDecoder decoder; - private int nextOffset; public RunLengthBitPackingHybridValuesReader(int bitWidth) { this.bitWidth = bitWidth; } @Override - public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException { - ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset); - int length = BytesUtils.readIntLittleEndian(in); - - decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); - - // 4 is for the length which is stored as 4 bytes little endian - this.nextOffset = offset + length + 4; - } - - @Override - public int getNextOffset() { - return this.nextOffset; + public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException { + int length = BytesUtils.readIntLittleEndian(stream); + this.decoder = new RunLengthBitPackingHybridDecoder( + bitWidth, stream.sliceStream(length)); } @Override diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java index f8ff8d0d98..fe00de999e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; /** @@ -30,20 +31,12 @@ */ public class ZeroIntegerValuesReader extends ValuesReader { - private int nextOffset; - public int readInteger() { return 0; } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - this.nextOffset = offset; - } - - @Override - public int getNextOffset() { - return nextOffset; + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java index 1f39d95b82..5bcbb884a9 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java @@ -21,6 +21,7 @@ import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.SemanticVersion; import org.apache.parquet.VersionParser.ParsedVersion; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -100,13 +101,13 @@ public void testReassemblyWithCorruptPage() throws Exception { ByteBuffer corruptPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); - firstPageReader.initFromPage(10, firstPageBytes, 0); + firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes)); for (int i = 0; i < 10; i += 1) { - assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i)); + assertEquals(str(i), firstPageReader.readBytes().toStringUsingUTF8()); } DeltaByteArrayReader corruptPageReader = new DeltaByteArrayReader(); - corruptPageReader.initFromPage(10, corruptPageBytes, 0); + corruptPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes)); try { corruptPageReader.readBytes(); fail("Corrupt page did not throw an exception when read"); @@ -115,7 +116,7 @@ public void testReassemblyWithCorruptPage() throws Exception { } DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader(); - secondPageReader.initFromPage(10, corruptPageBytes, 0); + secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes)); secondPageReader.setPreviousReader(firstPageReader); for (int i = 10; i < 20; i += 1) { @@ -140,13 +141,13 @@ public void testReassemblyWithoutCorruption() throws Exception { ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); - firstPageReader.initFromPage(10, firstPageBytes, 0); + firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes)); for (int i = 0; i < 10; i += 1) { assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i)); } DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader(); - secondPageReader.initFromPage(10, secondPageBytes, 0); + secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes)); secondPageReader.setPreviousReader(firstPageReader); for (int i = 10; i < 20; i += 1) { @@ -171,13 +172,13 @@ public void testOldReassemblyWithoutCorruption() throws Exception { ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); - firstPageReader.initFromPage(10, firstPageBytes, 0); + firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes)); for (int i = 0; i < 10; i += 1) { assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i)); } DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader(); - secondPageReader.initFromPage(10, secondPageBytes, 0); + secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes)); for (int i = 10; i < 20; i += 1) { assertEquals(secondPageReader.readBytes().toStringUsingUTF8(), str(i)); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java index 8caad2ba5c..248e0391e7 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.io.api.Binary; /** @@ -59,33 +60,23 @@ public static void writeData(ValuesWriter writer, String[] strings) } } - public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length) + public static Binary[] readData(ValuesReader reader, ByteBufferInputStream stream, int length) throws IOException { Binary[] bins = new Binary[length]; - reader.initFromPage(length, ByteBuffer.wrap(data), 0); + reader.initFromPage(length, stream); for(int i=0; i < length; i++) { bins[i] = reader.readBytes(); } return bins; } - - public static Binary[] readData(ValuesReader reader, byte[] data, int length) - throws IOException { - return readData(reader, data, 0, length); - } - - public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length) + + public static int[] readInts(ValuesReader reader, ByteBufferInputStream stream, int length) throws IOException { int[] ints = new int[length]; - reader.initFromPage(length, ByteBuffer.wrap(data), offset); + reader.initFromPage(length, stream); for(int i=0; i < length; i++) { ints[i] = reader.readInteger(); } return ints; } - - public static int[] readInts(ValuesReader reader, byte[] data, int length) - throws IOException { - return readInts(reader, data, 0, length); - } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java index 2733b7269b..656623cc16 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter; @@ -88,7 +89,7 @@ private static long readNTimes(byte[] bytes, int[] result, ValuesReader r) System.out.print(" no gc <"); for (int k = 0; k < N; k++) { long t2 = System.nanoTime(); - r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0); + r.initFromPage(result.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))); for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java index d83628a941..867af2876d 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Test; import org.apache.parquet.bytes.DirectByteBufferAllocator; @@ -175,7 +176,7 @@ private void validateEncodeDecode(int bitLength, int[] vals, String expected) th LOG.debug("bytes: {}", TestBitPacking.toString(bytes)); assertEquals(type.toString(), expected, TestBitPacking.toString(bytes)); ValuesReader r = type.getReader(bound); - r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0); + r.initFromPage(vals.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))); int[] result = new int[vals.length]; for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java index a3bec4a86d..ff4a308f59 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Before; import org.junit.Test; @@ -143,7 +144,7 @@ public void shouldReadMaxMinValue() throws IOException { } @Test - public void shouldReturnCorrectOffsetAfterInitialization() throws IOException { + public void shouldConsumePageDataInInitialization() throws IOException { int[] data = new int[2 * blockSize + 3]; for (int i = 0; i < data.length; i++) { data[i] = i * 32; @@ -157,12 +158,14 @@ public void shouldReturnCorrectOffsetAfterInitialization() throws IOException { int contentOffsetInPage = 33; System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length); - //offset should be correct - reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage); - int offset= reader.getNextOffset(); + // offset should be correct + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent)); + stream.skipFully(contentOffsetInPage); + reader.initFromPage(100, stream); + long offset = stream.position(); assertEquals(valueContent.length + contentOffsetInPage, offset); - //should be able to read data correclty + // should be able to read data correctly for (int i : data) { assertEquals(i, reader.readInteger()); } @@ -191,7 +194,7 @@ public void shouldSkip() throws IOException { } writeData(data); reader = new DeltaBinaryPackingValuesReader(); - reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, writer.getBytes().toInputStream()); for (int i = 0; i < data.length; i++) { if (i % 3 == 0) { reader.skip(); @@ -247,7 +250,7 @@ private void shouldReadAndWrite(int[] data, int length) throws IOException { + blockFlushed * miniBlockNum //bitWidth of mini blocks + (5.0 * blockFlushed);//min delta for each block assertTrue(estimatedSize >= page.length); - reader.initFromPage(100, ByteBuffer.wrap(page), 0); + reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); for (int i = 0; i < length; i++) { assertEquals(data[i], reader.readInteger()); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java index 34e1800ad7..795a591517 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Before; import org.junit.Test; @@ -157,12 +158,14 @@ public void shouldReturnCorrectOffsetAfterInitialization() throws IOException { int contentOffsetInPage = 33; System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length); - //offset should be correct - reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage); - int offset = reader.getNextOffset(); + // offset should be correct + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent)); + stream.skipFully(contentOffsetInPage); + reader.initFromPage(100, stream); + long offset = stream.position(); assertEquals(valueContent.length + contentOffsetInPage, offset); - //should be able to read data correclty + // should be able to read data correctly for (long i : data) { assertEquals(i, reader.readLong()); } @@ -190,7 +193,7 @@ public void shouldSkip() throws IOException { } writeData(data); reader = new DeltaBinaryPackingValuesReader(); - reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, writer.getBytes().toInputStream()); for (int i = 0; i < data.length; i++) { if (i % 3 == 0) { reader.skip(); @@ -244,7 +247,7 @@ private void shouldReadAndWrite(long[] data, int length) throws IOException { + blockFlushed * miniBlockNum //bitWidth of mini blocks + (10.0 * blockFlushed);//min delta for each block assertTrue(estimatedSize >= page.length); - reader.initFromPage(100, page, 0); + reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); for (int i = 0; i < length; i++) { assertEquals(data[i], reader.readLong()); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java index 488208cf8f..ba5d77137b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; @@ -91,7 +92,7 @@ public void readingRLE() throws IOException { } private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException { - reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0); + reader.initFromPage(data.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaBytes))); for (int i = 0; i < data.length; i++) { reader.readInteger(); } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java index d7ebee502d..d214a88980 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java @@ -43,7 +43,7 @@ public void testSerialization () throws IOException { DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); Utils.writeData(writer, values); - Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); + Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length); for(int i =0; i< bin.length ; i++) { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); @@ -57,7 +57,7 @@ public void testRandomStrings() throws IOException { String[] values = Utils.getRandomStringSamples(1000, 32); Utils.writeData(writer, values); - Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); + Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length); for(int i =0; i< bin.length ; i++) { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); @@ -70,7 +70,7 @@ public void testLengths() throws IOException { ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); - int[] bin = Utils.readInts(reader, writer.getBytes().toByteArray(), values.length); + int[] bin = Utils.readInts(reader, writer.getBytes().toInputStream(), values.length); for(int i =0; i< bin.length ; i++) { Assert.assertEquals(values[i].length(), bin[i]); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java index 69c5e15c6e..08d04e6d0d 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Rule; import org.junit.Test; @@ -52,9 +53,9 @@ public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -64,9 +65,9 @@ public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java index 4f8f40c179..c13a3a2b87 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Test; import org.junit.Assert; @@ -63,7 +64,7 @@ public void testLengths() throws IOException { ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); - byte[] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); int[] bin = Utils.readInts(reader, data, values.length); // test prefix lengths @@ -71,9 +72,8 @@ public void testLengths() throws IOException { Assert.assertEquals(7, bin[1]); Assert.assertEquals(7, bin[2]); - int offset = reader.getNextOffset(); reader = new DeltaBinaryPackingValuesReader(); - bin = Utils.readInts(reader, writer.getBytes().toByteArray(), offset, values.length); + bin = Utils.readInts(reader, data, values.length); // test suffix lengths Assert.assertEquals(10, bin[0]); Assert.assertEquals(0, bin[1]); @@ -82,7 +82,7 @@ public void testLengths() throws IOException { private void assertReadWrite(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception { Utils.writeData(writer, vals); - Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), vals.length); + Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), vals.length); for(int i = 0; i< bin.length ; i++) { Assert.assertEquals(Binary.fromString(vals[i]), bin[i]); @@ -92,7 +92,7 @@ private void assertReadWrite(DeltaByteArrayWriter writer, DeltaByteArrayReader r private void assertReadWriteWithSkip(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception { Utils.writeData(writer, vals); - reader.initFromPage(vals.length, writer.getBytes().toByteBuffer(), 0); + reader.initFromPage(vals.length, writer.getBytes().toInputStream()); for (int i = 0; i < vals.length; i += 2) { Assert.assertEquals(Binary.fromString(vals[i]), reader.readBytes()); reader.skip(); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java index eac4bd27e6..53578f0697 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Rule; import org.junit.Test; @@ -59,9 +60,9 @@ public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -71,9 +72,9 @@ public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -83,9 +84,9 @@ public void benchmarkSortedStringsWithPlainValuesWriter() throws IOException { BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, sortedVals); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -95,8 +96,8 @@ public void benchmarkSortedStringsWithDeltaLengthByteArrayValuesWriter() throws DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, sortedVals); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java index ada1c93cfb..cf669820f7 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java @@ -30,6 +30,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Assert; import org.junit.Test; @@ -118,7 +119,7 @@ public void testBinaryDictionaryFallBack() throws IOException { //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back ValuesReader reader = new BinaryPlainValuesReader(); - reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, cw.getBytes().toInputStream()); for (long i = 0; i < 100; i++) { assertEquals(Binary.fromString("str" + i), reader.readBytes()); @@ -204,13 +205,13 @@ public void testLongDictionary() throws IOException { DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64); - cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); + cr.initFromPage(COUNT, bytes1.toInputStream()); for (long i = 0; i < COUNT; i++) { long back = cr.readLong(); assertEquals(i % 50, back); } - cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); + cr.initFromPage(COUNT2, bytes2.toInputStream()); for (long i = COUNT2; i > 0; i--) { long back = cr.readLong(); assertEquals(i % 50, back); @@ -228,7 +229,7 @@ private void roundTripLong(FallbackValuesWriter 0; i--) { double back = cr.readDouble(); assertEquals(i % 50, back, 0.0); @@ -299,7 +300,7 @@ private void roundTripDouble(FallbackValuesWriter 0; i--) { int back = cr.readInteger(); assertEquals(i % 50, back); @@ -370,7 +371,7 @@ private void roundTripInt(FallbackValuesWriter 0; i--) { float back = cr.readFloat(); assertEquals(i % 50, back, 0.0f); @@ -441,7 +442,7 @@ private void roundTripFloat(FallbackValuesWriter buffers) { + if (buffers.size() == 1) { + return new SingleBufferInputStream(buffers.get(0)); + } else { + return new MultiBufferInputStream(buffers); } - //Workaround for unsigned byte - return byteBuf.get() & 0xFF; } - @Override - public int read(byte[] bytes, int offset, int length) throws IOException { - int count = Math.min(byteBuf.remaining(), length); - if (count == 0) return -1; - byteBuf.get(bytes, offset, count); - return count; + public abstract long position(); + + public void skipFully(long n) throws IOException { + long skipped = skip(n); + if (skipped < n) { + throw new EOFException( + "Not enough bytes to skip: " + skipped + " < " + n); + } } - - @Override - public long skip(long n) { - if (n > byteBuf.remaining()) - n = byteBuf.remaining(); - int pos = byteBuf.position(); - byteBuf.position((int)(pos + n)); - return n; + + public abstract int read(ByteBuffer out); + + public abstract ByteBuffer slice(int length) throws EOFException; + + public abstract List sliceBuffers(long length) throws EOFException; + + public ByteBufferInputStream sliceStream(long length) throws EOFException { + return ByteBufferInputStream.wrap(sliceBuffers(length)); } + public abstract List remainingBuffers(); - @Override - public int available() { - return byteBuf.remaining(); + public ByteBufferInputStream remainingStream() { + return ByteBufferInputStream.wrap(remainingBuffers()); } } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 6e593c2409..1512a2483b 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -23,11 +23,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,6 @@ */ abstract public class BytesInput { private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class); - private static final boolean DEBUG = false;//Log.DEBUG; private static final EmptyBytesInput EMPTY_BYTES_INPUT = new EmptyBytesInput(); /** @@ -75,14 +74,27 @@ public static BytesInput concat(List inputs) { public static BytesInput from(InputStream in, int bytes) { return new StreamBytesInput(in, bytes); } - + /** - * @param buffer - * @param length number of bytes to read - * @return a BytesInput that will read the given bytes from the ByteBuffer + * @param buffers + * @return a BytesInput that will read the given bytes from the ByteBuffers */ - public static BytesInput from(ByteBuffer buffer, int offset, int length) { - return new ByteBufferBytesInput(buffer, offset, length); + public static BytesInput from(ByteBuffer... buffers) { + if (buffers.length == 1) { + return new ByteBufferBytesInput(buffers[0]); + } + return new BufferListBytesInput(Arrays.asList(buffers)); + } + + /** + * @param buffers + * @return a BytesInput that will read the given bytes from the ByteBuffers + */ + public static BytesInput from(List buffers) { + if (buffers.size() == 1) { + return new ByteBufferBytesInput(buffers.get(0)); + } + return new BufferListBytesInput(buffers); } /** @@ -208,8 +220,8 @@ public ByteBuffer toByteBuffer() throws IOException { * @return a new InputStream materializing the contents of this input * @throws IOException */ - public InputStream toInputStream() throws IOException { - return new ByteBufferInputStream(toByteBuffer()); + public ByteBufferInputStream toInputStream() throws IOException { + return ByteBufferInputStream.wrap(toByteBuffer()); } /** @@ -439,7 +451,7 @@ public void writeAllTo(OutputStream out) throws IOException { } public ByteBuffer toByteBuffer() throws IOException { - return ByteBuffer.wrap(in, offset, length); + return java.nio.ByteBuffer.wrap(in, offset, length); } @Override @@ -448,34 +460,31 @@ public long size() { } } - - private static class ByteBufferBytesInput extends BytesInput { - - private final ByteBuffer byteBuf; - private final int length; - private final int offset; - private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) { - this.byteBuf = byteBuf; - this.offset = offset; - this.length = length; + private static class BufferListBytesInput extends BytesInput { + private final List buffers; + private final long length; + + public BufferListBytesInput(List buffers) { + this.buffers = buffers; + long totalLen = 0; + for (ByteBuffer buffer : buffers) { + totalLen += buffer.remaining(); + } + this.length = totalLen; } @Override public void writeAllTo(OutputStream out) throws IOException { - final WritableByteChannel outputChannel = Channels.newChannel(out); - byteBuf.position(offset); - ByteBuffer tempBuf = byteBuf.slice(); - tempBuf.limit(length); - outputChannel.write(tempBuf); + WritableByteChannel channel = Channels.newChannel(out); + for (ByteBuffer buffer : buffers) { + channel.write(buffer.duplicate()); + } } - + @Override - public ByteBuffer toByteBuffer() throws IOException { - byteBuf.position(offset); - ByteBuffer buf = byteBuf.slice(); - buf.limit(length); - return buf; + public ByteBufferInputStream toInputStream() { + return ByteBufferInputStream.wrap(buffers); } @Override @@ -483,4 +492,27 @@ public long size() { return length; } } + + private static class ByteBufferBytesInput extends BytesInput { + private final ByteBuffer buffer; + + private ByteBufferBytesInput(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + Channels.newChannel(out).write(buffer.duplicate()); + } + + @Override + public ByteBufferInputStream toInputStream() { + return ByteBufferInputStream.wrap(buffer); + } + + @Override + public long size() { + return buffer.remaining(); + } + } } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java new file mode 100644 index 0000000000..20a142bd8e --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class MultiBufferInputStream extends ByteBufferInputStream { + private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); + + private final List buffers; + private final long length; + + private Iterator iterator; + private ByteBuffer current = EMPTY; + private long position = 0; + + private long mark = -1; + private long markLimit = 0; + private List markBuffers = new ArrayList<>(); + + MultiBufferInputStream(List buffers) { + this.buffers = buffers; + + long totalLen = 0; + for (ByteBuffer buffer : buffers) { + totalLen += buffer.remaining(); + } + this.length = totalLen; + + this.iterator = buffers.iterator(); + + nextBuffer(); + } + + /** + * Returns the position in the stream. + */ + public long position() { + return position; + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + + if (current == null) { + return -1; + } + + long bytesSkipped = 0; + while (bytesSkipped < n) { + if (current.remaining() > 0) { + long bytesToSkip = Math.min(n - bytesSkipped, current.remaining()); + current.position(current.position() + (int) bytesToSkip); + bytesSkipped += bytesToSkip; + this.position += bytesToSkip; + } else if (!nextBuffer()) { + // there are no more buffers + return bytesSkipped > 0 ? bytesSkipped : -1; + } + } + + return bytesSkipped; + } + + @Override + public int read(ByteBuffer out) { + int len = out.remaining(); + if (len <= 0) { + return 0; + } + + if (current == null) { + return -1; + } + + int bytesCopied = 0; + while (bytesCopied < len) { + if (current.remaining() > 0) { + int bytesToCopy; + ByteBuffer copyBuffer; + if (current.remaining() <= out.remaining()) { + // copy all of the current buffer + bytesToCopy = current.remaining(); + copyBuffer = current; + } else { + // copy a slice of the current buffer + bytesToCopy = out.remaining(); + copyBuffer = current.duplicate(); + copyBuffer.limit(copyBuffer.position() + bytesToCopy); + current.position(copyBuffer.position() + bytesToCopy); + } + + out.put(copyBuffer); + bytesCopied += bytesToCopy; + this.position += bytesToCopy; + + } else if (!nextBuffer()) { + // there are no more buffers + return bytesCopied > 0 ? bytesCopied : -1; + } + } + + return bytesCopied; + } + + @Override + public ByteBuffer slice(int length) throws EOFException { + if (length <= 0) { + return EMPTY; + } + + if (current == null) { + throw new EOFException(); + } + + ByteBuffer slice; + if (length > current.remaining()) { + // a copy is needed to return a single buffer + // TODO: use an allocator + slice = ByteBuffer.allocate(length); + int bytesCopied = read(slice); + slice.flip(); + if (bytesCopied < length) { + throw new EOFException(); + } + } else { + slice = current.duplicate(); + slice.limit(slice.position() + length); + current.position(slice.position() + length); + this.position += length; + } + + return slice; + } + + public List sliceBuffers(long len) throws EOFException { + if (len <= 0) { + return Collections.emptyList(); + } + + if (current == null) { + throw new EOFException(); + } + + List buffers = new ArrayList<>(); + long bytesAccumulated = 0; + while (bytesAccumulated < len) { + if (current.remaining() > 0) { + // get a slice of the current buffer to return + // always fits in an int because remaining returns an int that is >= 0 + int bufLen = (int) Math.min(len - bytesAccumulated, current.remaining()); + ByteBuffer slice = current.duplicate(); + slice.limit(slice.position() + bufLen); + buffers.add(slice); + bytesAccumulated += bufLen; + + // update state; the bytes are considered read + current.position(current.position() + bufLen); + this.position += bufLen; + } else if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + + return buffers; + } + + @Override + public List remainingBuffers() { + if (position >= length) { + return Collections.emptyList(); + } + + try { + return sliceBuffers(length - position); + } catch (EOFException e) { + throw new RuntimeException( + "[Parquet bug] Stream is bad: incorrect bytes remaining " + + (length - position)); + } + } + + @Override + public int read(byte[] bytes, int off, int len) { + if (len <= 0) { + if (len < 0) { + throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); + } + return 0; + } + + if (current == null) { + return -1; + } + + int bytesRead = 0; + while (bytesRead < len) { + if (current.remaining() > 0) { + int bytesToRead = Math.min(len - bytesRead, current.remaining()); + current.get(bytes, off + bytesRead, bytesToRead); + bytesRead += bytesToRead; + this.position += bytesToRead; + } else if (!nextBuffer()) { + // there are no more buffers + return bytesRead > 0 ? bytesRead : -1; + } + } + + return bytesRead; + } + + @Override + public int read(byte[] bytes) { + return read(bytes, 0, bytes.length); + } + + @Override + public int read() throws IOException { + if (current == null) { + throw new EOFException(); + } + + while (true) { + if (current.remaining() > 0) { + this.position += 1; + return current.get() & 0xFF; // as unsigned + } else if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + } + + @Override + public int available() { + long remaining = length - position; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) remaining; + } + } + + @Override + public void mark(int readlimit) { + if (mark >= 0) { + discardMark(); + } + this.mark = position; + this.markLimit = mark + readlimit + 1; + if (current != null) { + markBuffers.add(current.duplicate()); + } + } + + @Override + public void reset() throws IOException { + if (mark >= 0 && position < markLimit) { + this.position = mark; + // replace the current iterator with one that adds back the buffers that + // have been used since mark was called. + this.iterator = concat(markBuffers.iterator(), iterator); + discardMark(); + nextBuffer(); // go back to the marked buffers + } else { + throw new IOException("No mark defined or has read past the previous mark limit"); + } + } + + private void discardMark() { + this.mark = -1; + this.markLimit = 0; + markBuffers = new ArrayList<>(); + } + + @Override + public boolean markSupported() { + return true; + } + + private boolean nextBuffer() { + if (!iterator.hasNext()) { + this.current = null; + return false; + } + + this.current = iterator.next().duplicate(); + + if (mark >= 0) { + if (position < markLimit) { + // the mark is defined and valid. save the new buffer + markBuffers.add(current.duplicate()); + } else { + // the mark has not been used and is no longer valid + discardMark(); + } + } + + return true; + } + + private static Iterator concat(Iterator first, Iterator second) { + return new ConcatIterator<>(first, second); + } + + private static class ConcatIterator implements Iterator { + private final Iterator first; + private final Iterator second; + boolean useFirst = true; + + public ConcatIterator(Iterator first, Iterator second) { + this.first = first; + this.second = second; + } + + @Override + public boolean hasNext() { + if (useFirst) { + if (first.hasNext()) { + return true; + } else { + useFirst = false; + return second.hasNext(); + } + } + return second.hasNext(); + } + + @Override + public E next() { + if (useFirst && !first.hasNext()) { + useFirst = false; + } + + if (!useFirst && !second.hasNext()) { + throw new NoSuchElementException(); + } + + if (useFirst) { + return first.next(); + } + + return second.next(); + } + + @Override + public void remove() { + if (useFirst) { + first.remove(); + } + second.remove(); + } + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java new file mode 100644 index 0000000000..999d1bb4f6 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +/** + * This ByteBufferInputStream does not consume the ByteBuffer being passed in, + * but will create a slice of the current buffer. + */ +class SingleBufferInputStream extends ByteBufferInputStream { + + private final ByteBuffer buffer; + private final long startPosition; + private int mark = -1; + + SingleBufferInputStream(ByteBuffer buffer) { + // duplicate the buffer because its state will be modified + this.buffer = buffer.duplicate(); + this.startPosition = buffer.position(); + } + + @Override + public long position() { + // position is relative to the start of the stream, not the buffer + return buffer.position() - startPosition; + } + + @Override + public int read() throws IOException { + if (!buffer.hasRemaining()) { + throw new EOFException(); + } + return buffer.get() & 0xFF; // as unsigned + } + + @Override + public int read(byte[] bytes, int offset, int length) throws IOException { + if (length == 0) { + return 0; + } + + int remaining = buffer.remaining(); + if (remaining <= 0) { + return -1; + } + + int bytesToRead = Math.min(buffer.remaining(), length); + buffer.get(bytes, offset, bytesToRead); + + return bytesToRead; + } + + @Override + public long skip(long n) { + if (n == 0) { + return 0; + } + + if (buffer.remaining() <= 0) { + return -1; + } + + // buffer.remaining is an int, so this will always fit in an int + int bytesToSkip = (int) Math.min(buffer.remaining(), n); + buffer.position(buffer.position() + bytesToSkip); + + return bytesToSkip; + } + + @Override + public int read(ByteBuffer out) { + int bytesToCopy; + ByteBuffer copyBuffer; + if (buffer.remaining() <= out.remaining()) { + // copy all of the buffer + bytesToCopy = buffer.remaining(); + copyBuffer = buffer; + } else { + // copy a slice of the current buffer + bytesToCopy = out.remaining(); + copyBuffer = buffer.duplicate(); + copyBuffer.limit(buffer.position() + bytesToCopy); + buffer.position(buffer.position() + bytesToCopy); + } + + out.put(copyBuffer); + out.flip(); + + return bytesToCopy; + } + + @Override + public ByteBuffer slice(int length) throws EOFException { + if (buffer.remaining() < length) { + throw new EOFException(); + } + + // length is less than remaining, so it must fit in an int + ByteBuffer copy = buffer.duplicate(); + copy.limit(copy.position() + length); + buffer.position(buffer.position() + length); + + return copy; + } + + @Override + public List sliceBuffers(long length) throws EOFException { + if (length == 0) { + return Collections.emptyList(); + } + + if (length > buffer.remaining()) { + throw new EOFException(); + } + + // length is less than remaining, so it must fit in an int + return Collections.singletonList(slice((int) length)); + } + + @Override + public List remainingBuffers() { + if (buffer.remaining() <= 0) { + return Collections.emptyList(); + } + + ByteBuffer remaining = buffer.duplicate(); + buffer.position(buffer.limit()); + + return Collections.singletonList(remaining); + } + + @Override + public void mark(int readlimit) { + this.mark = buffer.position(); + } + + @Override + public void reset() throws IOException { + if (mark >= 0) { + buffer.position(mark); + this.mark = -1; + } else { + throw new IOException("No mark defined"); + } + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public int available() { + return buffer.remaining(); + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java new file mode 100644 index 0000000000..7bed2a9d28 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import org.junit.Assert; +import org.junit.Test; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; + +public abstract class TestByteBufferInputStreams { + + protected abstract ByteBufferInputStream newStream(); + protected abstract void checkOriginalData(); + + @Test + public void testRead0() throws Exception { + byte[] bytes = new byte[0]; + + ByteBufferInputStream stream = newStream(); + + Assert.assertEquals("Should read 0 bytes", 0, stream.read(bytes)); + + int bytesRead = stream.read(new byte[100]); + Assert.assertTrue("Should read to end of stream", bytesRead < 100); + + Assert.assertEquals("Should read 0 bytes at end of stream", + 0, stream.read(bytes)); + } + + @Test + public void testReadAll() throws Exception { + byte[] bytes = new byte[35]; + + ByteBufferInputStream stream = newStream(); + + int bytesRead = stream.read(bytes); + Assert.assertEquals("Should read the entire buffer", + bytes.length, bytesRead); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + Assert.assertEquals("Should advance position", 35, stream.position()); + } + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + checkOriginalData(); + } + + @Test + public void testSmallReads() throws Exception { + for (int size = 1; size < 36; size += 1) { + byte[] bytes = new byte[size]; + + ByteBufferInputStream stream = newStream(); + long length = stream.available(); + + int lastBytesRead = bytes.length; + for (int offset = 0; offset < length; offset += bytes.length) { + Assert.assertEquals("Should read requested len", + bytes.length, lastBytesRead); + + lastBytesRead = stream.read(bytes, 0, bytes.length); + + Assert.assertEquals("Should advance position", + offset + lastBytesRead, stream.position()); + + // validate the bytes that were read + for (int i = 0; i < lastBytesRead; i += 1) { + Assert.assertEquals("Byte i should be i", offset + i, bytes[i]); + } + } + + Assert.assertEquals("Should read fewer bytes at end of buffer", + length % bytes.length, lastBytesRead % bytes.length); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + } + + checkOriginalData(); + } + + @Test + public void testPartialBufferReads() throws Exception { + for (int size = 1; size < 35; size += 1) { + byte[] bytes = new byte[33]; + + ByteBufferInputStream stream = newStream(); + + int lastBytesRead = size; + for (int offset = 0; offset < bytes.length; offset += size) { + Assert.assertEquals("Should read requested len", size, lastBytesRead); + + lastBytesRead = stream.read( + bytes, offset, Math.min(size, bytes.length - offset)); + + Assert.assertEquals("Should advance position", + lastBytesRead > 0 ? offset + lastBytesRead : offset, + stream.position()); + } + + Assert.assertEquals("Should read fewer bytes at end of buffer", + bytes.length % size, lastBytesRead % size); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + } + + Assert.assertEquals("Should have no more remaining content", + 2, stream.available()); + + Assert.assertEquals("Should return 2 more bytes", + 2, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + } + + checkOriginalData(); + } + + @Test + public void testReadByte() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Position should increment", i, stream.position()); + Assert.assertEquals(i, stream.read()); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, new Callable() { + @Override + public Integer call() throws IOException { + return stream.read(); + } + }); + + checkOriginalData(); + } + + @Test + public void testSlice() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + ByteBuffer empty = stream.slice(0); + Assert.assertNotNull("slice(0) should produce a non-null buffer", empty); + Assert.assertEquals("slice(0) should produce an empty buffer", + 0, empty.remaining()); + + Assert.assertEquals("Position should be at start", 0, stream.position()); + + int i = 0; + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 10); + ByteBuffer buffer = stream.slice(bytesToSlice); + + for (int j = 0; j < bytesToSlice; j += 1) { + Assert.assertEquals("Data should be correct", i + j, buffer.get()); + } + + i += bytesToSlice; + } + + Assert.assertEquals("Position should be at end", length, stream.position()); + + checkOriginalData(); + } + + @Test + public void testSliceBuffers0() throws Exception { + ByteBufferInputStream stream = newStream(); + + Assert.assertEquals("Should return an empty list", + Collections.emptyList(), stream.sliceBuffers(0)); + } + + @Test + public void testWholeSliceBuffers() throws Exception { + final ByteBufferInputStream stream = newStream(); + final int length = stream.available(); + + List buffers = stream.sliceBuffers(stream.available()); + + Assert.assertEquals("Should consume all buffers", length, stream.position()); + + assertThrows("Should throw EOFException when empty", + EOFException.class, new Callable>() { + @Override + public List call() throws Exception { + return stream.sliceBuffers(length); + } + }); + + ByteBufferInputStream copy = ByteBufferInputStream.wrap(buffers); + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Slice should have identical data", i, copy.read()); + } + + checkOriginalData(); + } + + @Test + public void testSliceBuffersCoverage() throws Exception { + for (int size = 1; size < 36; size += 1) { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = new ArrayList<>(); + while (stream.available() > 0) { + buffers.addAll(stream.sliceBuffers(Math.min(size, stream.available()))); + } + + Assert.assertEquals("Should consume all content", + length, stream.position()); + + ByteBufferInputStream newStream = new MultiBufferInputStream(buffers); + + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Data should be correct", i, newStream.read()); + } + } + + checkOriginalData(); + } + + @Test + public void testSliceBuffersModification() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + int sliceLength = 5; + List buffers = stream.sliceBuffers(sliceLength); + Assert.assertEquals("Should advance the original stream", + length - sliceLength, stream.available()); + Assert.assertEquals("Should advance the original stream position", + sliceLength, stream.position()); + + Assert.assertEquals("Should return a slice of the first buffer", + 1, buffers.size()); + + ByteBuffer buffer = buffers.get(0); + Assert.assertEquals("Should have requested bytes", + sliceLength, buffer.remaining()); + + // read the buffer one past the returned limit. this should not change the + // next value in the original stream + buffer.limit(sliceLength + 1); + for (int i = 0; i < sliceLength + 1; i += 1) { + Assert.assertEquals("Should have correct data", i, buffer.get()); + } + + Assert.assertEquals("Reading a slice shouldn't advance the original stream", + sliceLength, stream.position()); + Assert.assertEquals("Reading a slice shouldn't change the underlying data", + sliceLength, stream.read()); + + // change the underlying data buffer + buffer.limit(sliceLength + 2); + int originalValue = buffer.duplicate().get(); + ByteBuffer undoBuffer = buffer.duplicate(); + + try { + buffer.put((byte) 255); + + Assert.assertEquals( + "Writing to a slice shouldn't advance the original stream", + sliceLength + 1, stream.position()); + Assert.assertEquals( + "Writing to a slice should change the underlying data", + 255, stream.read()); + + } finally { + undoBuffer.put((byte) originalValue); + } + } + + @Test + public void testSkip() throws Exception { + ByteBufferInputStream stream = newStream(); + + while (stream.available() > 0) { + int bytesToSkip = Math.min(stream.available(), 10); + Assert.assertEquals("Should skip all, regardless of backing buffers", + bytesToSkip, stream.skip(bytesToSkip)); + } + + stream = newStream(); + Assert.assertEquals(0, stream.skip(0)); + + int length = stream.available(); + Assert.assertEquals("Should stop at end when out of bytes", + length, stream.skip(length + 10)); + Assert.assertEquals("Should return -1 when at end", + -1, stream.skip(10)); + } + + @Test + public void testSkipFully() throws Exception { + ByteBufferInputStream stream = newStream(); + + long lastPosition = 0; + while (stream.available() > 0) { + int bytesToSkip = Math.min(stream.available(), 10); + + stream.skipFully(bytesToSkip); + + Assert.assertEquals("Should skip all, regardless of backing buffers", + bytesToSkip, stream.position() - lastPosition); + + lastPosition = stream.position(); + } + + final ByteBufferInputStream stream2 = newStream(); + stream2.skipFully(0); + Assert.assertEquals(0, stream2.position()); + + final int length = stream2.available(); + assertThrows("Should throw when out of bytes", + EOFException.class, new Callable() { + @Override + public Object call() throws Exception { + stream2.skipFully(length + 10); + return null; + } + }); + } + + @Test + public void testMark() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.read(new byte[7]); + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[100]; + int expectedBytesRead = stream.read(expected); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[100]; + int bytesReadAfterReset = stream.read(afterReset); + + Assert.assertEquals("Should read the same number of bytes", + expectedBytesRead, bytesReadAfterReset); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkTwice() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.read(new byte[7]); + stream.mark(1); + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[100]; + int expectedBytesRead = stream.read(expected); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[100]; + int bytesReadAfterReset = stream.read(afterReset); + + Assert.assertEquals("Should read the same number of bytes", + expectedBytesRead, bytesReadAfterReset); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkAtStart() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[10]; + Assert.assertEquals("Should read 10 bytes", 10, stream.read(expected)); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[10]; + Assert.assertEquals("Should read 10 bytes", 10, stream.read(afterReset)); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkAtEnd() throws Exception { + ByteBufferInputStream stream = newStream(); + + int bytesRead = stream.read(new byte[100]); + Assert.assertTrue("Should read to end of stream", bytesRead < 100); + + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[10]; + Assert.assertEquals("Should read 0 bytes", -1, stream.read(expected)); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[10]; + Assert.assertEquals("Should read 0 bytes", -1, stream.read(afterReset)); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkUnset() { + final ByteBufferInputStream stream = newStream(); + + assertThrows("Should throw an error for reset() without mark()", + IOException.class, new Callable() { + @Override + public Object call() throws Exception { + stream.reset(); + return null; + } + }); + } + + @Test + public void testMarkAndResetTwiceOverSameRange() throws Exception { + final ByteBufferInputStream stream = newStream(); + + byte[] expected = new byte[6]; + stream.mark(10); + Assert.assertEquals("Should read expected bytes", + expected.length, stream.read(expected)); + + stream.reset(); + stream.mark(10); + + byte[] firstRead = new byte[6]; + Assert.assertEquals("Should read firstRead bytes", + firstRead.length, stream.read(firstRead)); + + stream.reset(); + + byte[] secondRead = new byte[6]; + Assert.assertEquals("Should read secondRead bytes", + secondRead.length, stream.read(secondRead)); + + Assert.assertArrayEquals("First read should be correct", + expected, firstRead); + + Assert.assertArrayEquals("Second read should be correct", + expected, secondRead); + } + + @Test + public void testMarkLimit() throws Exception { + final ByteBufferInputStream stream = newStream(); + + stream.mark(5); + Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5])); + + stream.reset(); + + Assert.assertEquals("Should read 6 bytes", 6, stream.read(new byte[6])); + + assertThrows("Should throw an error for reset() after limit", + IOException.class, new Callable() { + @Override + public Object call() throws Exception { + stream.reset(); + return null; + } + }); + } + + @Test + public void testMarkDoubleReset() throws Exception { + final ByteBufferInputStream stream = newStream(); + + stream.mark(5); + Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5])); + + stream.reset(); + + assertThrows("Should throw an error for double reset()", + IOException.class, new Callable() { + @Override + public Object call() throws Exception { + stream.reset(); + return null; + } + }); + } + + /** + * A convenience method to avoid a large number of @Test(expected=...) tests + * @param message A String message to describe this assertion + * @param expected An Exception class that the Runnable should throw + * @param callable A Callable that is expected to throw the exception + */ + public static void assertThrows( + String message, Class expected, Callable callable) { + try { + callable.call(); + Assert.fail("No exception was thrown (" + message + "), expected: " + + expected.getName()); + } catch (Exception actual) { + try { + Assert.assertEquals(message, expected, actual.getClass()); + } catch (AssertionError e) { + e.addSuppressed(actual); + throw e; + } + } + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java new file mode 100644 index 0000000000..253c986ca6 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import org.junit.Assert; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestMultiBufferInputStream extends TestByteBufferInputStreams { + private static final List DATA = Arrays.asList( + ByteBuffer.wrap(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 }), + ByteBuffer.wrap(new byte[] { 9, 10, 11, 12 }), + ByteBuffer.wrap(new byte[] { }), + ByteBuffer.wrap(new byte[] { 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24 }), + ByteBuffer.wrap(new byte[] { 25 }), + ByteBuffer.wrap(new byte[] { 26, 27, 28, 29, 30, 31, 32 }), + ByteBuffer.wrap(new byte[] { 33, 34 }) + ); + + @Override + protected ByteBufferInputStream newStream() { + return new MultiBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + for (ByteBuffer buffer : DATA) { + Assert.assertEquals("Position should not change", 0, buffer.position()); + Assert.assertEquals("Limit should not change", + buffer.array().length, buffer.limit()); + } + } + + @Test + public void testSliceData() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = new ArrayList<>(); + // slice the stream into 3 8-byte buffers and 1 2-byte buffer + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 8); + buffers.add(stream.slice(bytesToSlice)); + } + + Assert.assertEquals("Position should be at end", length, stream.position()); + Assert.assertEquals("Should produce 5 buffers", 5, buffers.size()); + + int i = 0; + + // one is a view of the first buffer because it is smaller + ByteBuffer one = buffers.get(0); + Assert.assertSame("Should be a duplicate of the first array", + one.array(), DATA.get(0).array()); + Assert.assertEquals(8, one.remaining()); + Assert.assertEquals(0, one.position()); + Assert.assertEquals(8, one.limit()); + Assert.assertEquals(9, one.capacity()); + for (; i < 8; i += 1) { + Assert.assertEquals("Should produce correct values", i, one.get()); + } + + // two should be a copy of the next 8 bytes + ByteBuffer two = buffers.get(1); + Assert.assertEquals(8, two.remaining()); + Assert.assertEquals(0, two.position()); + Assert.assertEquals(8, two.limit()); + Assert.assertEquals(8, two.capacity()); + for (; i < 16; i += 1) { + Assert.assertEquals("Should produce correct values", i, two.get()); + } + + // three is a copy of part of the 4th buffer + ByteBuffer three = buffers.get(2); + Assert.assertSame("Should be a duplicate of the fourth array", + three.array(), DATA.get(3).array()); + Assert.assertEquals(8, three.remaining()); + Assert.assertEquals(3, three.position()); + Assert.assertEquals(11, three.limit()); + Assert.assertEquals(12, three.capacity()); + for (; i < 24; i += 1) { + Assert.assertEquals("Should produce correct values", i, three.get()); + } + + // four should be a copy of the next 8 bytes + ByteBuffer four = buffers.get(3); + Assert.assertEquals(8, four.remaining()); + Assert.assertEquals(0, four.position()); + Assert.assertEquals(8, four.limit()); + Assert.assertEquals(8, four.capacity()); + for (; i < 32; i += 1) { + Assert.assertEquals("Should produce correct values", i, four.get()); + } + + // five should be a copy of the next 8 bytes + ByteBuffer five = buffers.get(4); + Assert.assertEquals(3, five.remaining()); + Assert.assertEquals(0, five.position()); + Assert.assertEquals(3, five.limit()); + Assert.assertEquals(3, five.capacity()); + for (; i < 35; i += 1) { + Assert.assertEquals("Should produce correct values", i, five.get()); + } + } + + @Test + public void testSliceBuffersData() throws Exception { + ByteBufferInputStream stream = newStream(); + + List buffers = stream.sliceBuffers(stream.available()); + List nonEmptyBuffers = new ArrayList<>(); + for (ByteBuffer buffer : DATA) { + if (buffer.remaining() > 0) { + nonEmptyBuffers.add(buffer); + } + } + + Assert.assertEquals("Should return duplicates of all non-empty buffers", + nonEmptyBuffers, buffers); + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java new file mode 100644 index 0000000000..9db23be5a8 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import org.junit.Assert; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class TestSingleBufferInputStream extends TestByteBufferInputStreams { + private static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 }); + + @Override + protected ByteBufferInputStream newStream() { + return new SingleBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + Assert.assertEquals("Position should not change", 0, DATA.position()); + Assert.assertEquals("Limit should not change", + DATA.array().length, DATA.limit()); + } + + @Test + public void testSliceData() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = new ArrayList<>(); + // slice the stream into 3 8-byte buffers and 1 2-byte buffer + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 8); + buffers.add(stream.slice(bytesToSlice)); + } + + Assert.assertEquals("Position should be at end", length, stream.position()); + Assert.assertEquals("Should produce 5 buffers", 5, buffers.size()); + + int i = 0; + + ByteBuffer one = buffers.get(0); + Assert.assertSame("Should use the same backing array", + one.array(), DATA.array()); + Assert.assertEquals(8, one.remaining()); + Assert.assertEquals(0, one.position()); + Assert.assertEquals(8, one.limit()); + Assert.assertEquals(35, one.capacity()); + for (; i < 8; i += 1) { + Assert.assertEquals("Should produce correct values", i, one.get()); + } + + ByteBuffer two = buffers.get(1); + Assert.assertSame("Should use the same backing array", + two.array(), DATA.array()); + Assert.assertEquals(8, two.remaining()); + Assert.assertEquals(8, two.position()); + Assert.assertEquals(16, two.limit()); + Assert.assertEquals(35, two.capacity()); + for (; i < 16; i += 1) { + Assert.assertEquals("Should produce correct values", i, two.get()); + } + + // three is a copy of part of the 4th buffer + ByteBuffer three = buffers.get(2); + Assert.assertSame("Should use the same backing array", + three.array(), DATA.array()); + Assert.assertEquals(8, three.remaining()); + Assert.assertEquals(16, three.position()); + Assert.assertEquals(24, three.limit()); + Assert.assertEquals(35, three.capacity()); + for (; i < 24; i += 1) { + Assert.assertEquals("Should produce correct values", i, three.get()); + } + + // four should be a copy of the next 8 bytes + ByteBuffer four = buffers.get(3); + Assert.assertSame("Should use the same backing array", + four.array(), DATA.array()); + Assert.assertEquals(8, four.remaining()); + Assert.assertEquals(24, four.position()); + Assert.assertEquals(32, four.limit()); + Assert.assertEquals(35, four.capacity()); + for (; i < 32; i += 1) { + Assert.assertEquals("Should produce correct values", i, four.get()); + } + + // five should be a copy of the next 8 bytes + ByteBuffer five = buffers.get(4); + Assert.assertSame("Should use the same backing array", + five.array(), DATA.array()); + Assert.assertEquals(3, five.remaining()); + Assert.assertEquals(32, five.position()); + Assert.assertEquals(35, five.limit()); + Assert.assertEquals(35, five.capacity()); + for (; i < 35; i += 1) { + Assert.assertEquals("Should produce correct values", i, five.get()); + } + } + + @Test + public void testWholeSliceBuffersData() throws Exception { + ByteBufferInputStream stream = newStream(); + + List buffers = stream.sliceBuffers(stream.available()); + Assert.assertEquals("Should return duplicates of all non-empty buffers", + Collections.singletonList(DATA), buffers); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 87c8ac97d8..8d3e48dd13 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -37,6 +37,8 @@ public class HadoopReadOptions extends ParquetReadOptions { private final Configuration conf; + private static final String ALLOCATION_SIZE = "parquet.read.allocation.size"; + private HadoopReadOptions(boolean useSignedStringMinMax, boolean useStatsFilter, boolean useDictionaryFilter, @@ -45,11 +47,12 @@ private HadoopReadOptions(boolean useSignedStringMinMax, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, ByteBufferAllocator allocator, + int maxAllocationSize, Map properties, Configuration conf) { super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, recordFilter, - metadataFilter, codecFactory, allocator, properties + metadataFilter, codecFactory, allocator, maxAllocationSize, properties ); this.conf = conf; } @@ -82,6 +85,7 @@ public Builder(Configuration conf) { useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); + withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); if (badRecordThresh != null) { set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); @@ -92,7 +96,8 @@ public Builder(Configuration conf) { public ParquetReadOptions build() { return new HadoopReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - recordFilter, metadataFilter, codecFactory, allocator, properties, conf); + recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, + conf); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 5f2f0a8529..4ef24601c9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -38,6 +38,7 @@ public class ParquetReadOptions { private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; + private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private final boolean useSignedStringMinMax; private final boolean useStatsFilter; @@ -47,17 +48,19 @@ public class ParquetReadOptions { private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; private final ByteBufferAllocator allocator; + private final int maxAllocationSize; private final Map properties; ParquetReadOptions(boolean useSignedStringMinMax, - boolean useStatsFilter, - boolean useDictionaryFilter, - boolean useRecordFilter, - FilterCompat.Filter recordFilter, - ParquetMetadataConverter.MetadataFilter metadataFilter, - CompressionCodecFactory codecFactory, - ByteBufferAllocator allocator, - Map properties) { + boolean useStatsFilter, + boolean useDictionaryFilter, + boolean useRecordFilter, + FilterCompat.Filter recordFilter, + ParquetMetadataConverter.MetadataFilter metadataFilter, + CompressionCodecFactory codecFactory, + ByteBufferAllocator allocator, + int maxAllocationSize, + Map properties) { this.useSignedStringMinMax = useSignedStringMinMax; this.useStatsFilter = useStatsFilter; this.useDictionaryFilter = useDictionaryFilter; @@ -66,6 +69,7 @@ public class ParquetReadOptions { this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; this.allocator = allocator; + this.maxAllocationSize = maxAllocationSize; this.properties = Collections.unmodifiableMap(properties); } @@ -101,6 +105,10 @@ public ByteBufferAllocator getAllocator() { return allocator; } + public int getMaxAllocationSize() { + return maxAllocationSize; + } + public Set getPropertyNames() { return properties.keySet(); } @@ -122,16 +130,17 @@ public static Builder builder() { } public static class Builder { - boolean useSignedStringMinMax = false; - boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; - boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; - boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; - FilterCompat.Filter recordFilter = null; - ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; + protected boolean useSignedStringMinMax = false; + protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; + protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; + protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; + protected FilterCompat.Filter recordFilter = null; + protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors - CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); - ByteBufferAllocator allocator = new HeapByteBufferAllocator(); - Map properties = new HashMap<>(); + protected CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + protected ByteBufferAllocator allocator = new HeapByteBufferAllocator(); + protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT; + protected Map properties = new HashMap<>(); public Builder useSignedStringMinMax(boolean useSignedStringMinMax) { this.useSignedStringMinMax = useSignedStringMinMax; @@ -203,6 +212,11 @@ public Builder withAllocator(ByteBufferAllocator allocator) { return this; } + public Builder withMaxAllocationInBytes(int allocationSizeInBytes) { + this.maxAllocationSize = allocationSizeInBytes; + return this; + } + public Builder set(String key, String value) { properties.put(key, value); return this; @@ -226,7 +240,7 @@ public Builder copy(ParquetReadOptions options) { public ParquetReadOptions build() { return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - recordFilter, metadataFilter, codecFactory, allocator, properties); + recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 8befa79592..31d7bba9bf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -115,7 +115,7 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx @Override public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException { - ByteBuffer decompressed = decompress(BytesInput.from(input, 0, input.remaining()), uncompressedSize).toByteBuffer(); + ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); output.put(decompressed); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index 58e79ace06..1377999940 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -302,7 +302,9 @@ public BytesInput compress(BytesInput bytes) throws IOException { size = Snappy.compress(this.incoming, outgoing); } - return BytesInput.from(outgoing, 0, (int) size); + outgoing.limit(size); + + return BytesInput.from(outgoing); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 1ace040bf9..6ef8a6c35f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -879,23 +879,23 @@ public void close() throws IOException { * @author Julien Le Dem * */ - private class Chunk extends ByteBufferInputStream { + private class Chunk { - private final ChunkDescriptor descriptor; + protected final ChunkDescriptor descriptor; + protected final ByteBufferInputStream stream; /** * * @param descriptor descriptor for the chunk - * @param data contains the chunk data at offset - * @param offset where the chunk starts in offset + * @param buffers ByteBuffers that contain the chunk */ - public Chunk(ChunkDescriptor descriptor, ByteBuffer data, int offset) { - super(data, offset, descriptor.size); + public Chunk(ChunkDescriptor descriptor, List buffers) { this.descriptor = descriptor; + this.stream = ByteBufferInputStream.wrap(buffers); } protected PageHeader readPageHeader() throws IOException { - return Util.readPageHeader(this); + return Util.readPageHeader(stream); } /** @@ -967,7 +967,7 @@ public ColumnChunkPageReader readAllPages() throws IOException { break; default: LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); - this.skip(compressedPageSize); + stream.skipFully(compressedPageSize); break; } } @@ -977,29 +977,19 @@ public ColumnChunkPageReader readAllPages() throws IOException { "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " + getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() - + " pages ending at file offset " + (descriptor.fileOffset + pos())); + + " pages ending at file offset " + (descriptor.fileOffset + stream.position())); } BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec()); return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage); } - /** - * @return the current position in the chunk - */ - public int pos() { - return this.byteBuf.position(); - } - /** * @param size the size of the page * @return the page * @throws IOException */ public BytesInput readAsBytesInput(int size) throws IOException { - int pos = this.byteBuf.position(); - final BytesInput r = BytesInput.from(this.byteBuf, pos, size); - this.byteBuf.position(pos + size); - return r; + return BytesInput.from(stream.sliceBuffers(size)); } } @@ -1016,44 +1006,51 @@ private class WorkaroundChunk extends Chunk { /** * @param descriptor the descriptor of the chunk - * @param byteBuf contains the data of the chunk at offset - * @param offset where the chunk starts in data * @param f the file stream positioned at the end of this chunk */ - private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, SeekableInputStream f) { - super(descriptor, byteBuf, offset); + private WorkaroundChunk(ChunkDescriptor descriptor, List buffers, SeekableInputStream f) { + super(descriptor, buffers); this.f = f; } protected PageHeader readPageHeader() throws IOException { PageHeader pageHeader; - int initialPos = pos(); + stream.mark(8192); // headers should not be larger than 8k try { - pageHeader = Util.readPageHeader(this); + pageHeader = Util.readPageHeader(stream); } catch (IOException e) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error + stream.reset(); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); - pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream. + pageHeader = Util.readPageHeader(new SequenceInputStream(stream, f)); // trying again from the buffer + remainder of the stream. } return pageHeader; } public BytesInput readAsBytesInput(int size) throws IOException { - if (pos() + size > initPos + count) { + int available = stream.available(); + if (size > available) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing - int l1 = initPos + count - pos(); - int l2 = size - l1; - LOG.info("completed the column chunk with {} bytes", l2); - return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2))); + int missingBytes = size - available; + LOG.info("completed the column chunk with {} bytes", missingBytes); + + List buffers = new ArrayList<>(); + buffers.addAll(stream.sliceBuffers(available)); + + ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes); + f.readFully(lastBuffer); + buffers.add(lastBuffer); + + return BytesInput.from(buffers); } + return super.readAsBytesInput(size); } @@ -1126,22 +1123,36 @@ public List readAll(SeekableInputStream f) throws IOException { List result = new ArrayList(chunks.size()); f.seek(offset); - // Allocate the bytebuffer based on whether the FS can support it. - ByteBuffer chunksByteBuffer = options.getAllocator().allocate(length); - f.readFully(chunksByteBuffer); + int fullAllocations = length / options.getMaxAllocationSize(); + int lastAllocationSize = length % options.getMaxAllocationSize(); + + int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); + List buffers = new ArrayList<>(numAllocations); + + for (int i = 0; i < fullAllocations; i += 1) { + buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); + } + + if (lastAllocationSize > 0) { + buffers.add(options.getAllocator().allocate(lastAllocationSize)); + } + + for (ByteBuffer buffer : buffers) { + f.readFully(buffer); + buffer.flip(); + } // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); - int currentChunkOffset = 0; + ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers); for (int i = 0; i < chunks.size(); i++) { ChunkDescriptor descriptor = chunks.get(i); if (i < chunks.size() - 1) { - result.add(new Chunk(descriptor, chunksByteBuffer, currentChunkOffset)); + result.add(new Chunk(descriptor, stream.sliceBuffers(descriptor.size))); } else { // because of a bug, the last chunk might be larger than descriptor.size - result.add(new WorkaroundChunk(descriptor, chunksByteBuffer, currentChunkOffset, f)); + result.add(new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f)); } - currentChunkOffset += descriptor.size; } return result ; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 3dd17e96c0..9d9a72f332 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -73,7 +73,7 @@ private void test(int size, CompressionCodecName codec, boolean useOnHeapCompres if (useOnHeapCompression) { compressed = c.compress(BytesInput.from(rawArr)); } else { - compressed = c.compress(BytesInput.from(rawBuf, 0, rawBuf.remaining())); + compressed = c.compress(BytesInput.from(rawBuf)); } switch (decomp) { @@ -95,11 +95,11 @@ private void test(int size, CompressionCodecName codec, boolean useOnHeapCompres case OFF_HEAP_BYTES_INPUT: { final ByteBuffer buf = compressed.toByteBuffer(); - final ByteBuffer b = allocator.allocate(buf.capacity()); + final ByteBuffer b = allocator.allocate(buf.limit()); try { b.put(buf); b.flip(); - final BytesInput input = d.decompress(BytesInput.from(b, 0, b.capacity()), size); + final BytesInput input = d.decompress(BytesInput.from(b), size); Assert.assertArrayEquals( String.format("While testing codec %s", codec), input.toByteArray(), rawArr);