Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -888,20 +888,47 @@ public void appendTo(ParquetFileWriter writer) throws IOException {
writer.appendRowGroups(f, blocks, true);
}

/**
* Reads all the columns requested from the row group at the specified block.
*
* @param blockIndex the index of the requested block
* @throws IOException if an error occurs while reading
* @return the PageReadStore which can provide PageReaders for each column.
*/
public PageReadStore readRowGroup(int blockIndex) throws IOException {
return internalReadRowGroup(blockIndex);
}

/**
* Reads all the columns requested from the row group at the current file position.
* @throws IOException if an error occurs while reading
* @return the PageReadStore which can provide PageReaders for each column.
*/
public PageReadStore readNextRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
ColumnChunkPageReadStore rowGroup = internalReadRowGroup(currentBlock);
if (rowGroup == null) {
return null;
}
BlockMetaData block = blocks.get(currentBlock);
this.currentRowGroup = rowGroup;
// avoid re-reading bytes the dictionary reader is used after this call
if (nextDictionaryReader != null) {
nextDictionaryReader.setRowGroup(currentRowGroup);
}

advanceToNextBlock();

return currentRowGroup;
}

private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException {
if (blockIndex < 0 || blockIndex >= blocks.size()) {
return null;
}
BlockMetaData block = blocks.get(blockIndex);
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount());
ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount());
// prepare the list of consecutive parts to read them in one scan
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
ConsecutivePartList currentParts = null;
Expand All @@ -920,22 +947,54 @@ public PageReadStore readNextRowGroup() throws IOException {
}
}
// actually read all the chunks
ChunkListBuilder builder = new ChunkListBuilder();
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.readAll(f, builder);
}
for (Chunk chunk : builder.build()) {
readChunkPages(chunk, block);
readChunkPages(chunk, block, rowGroup);
}

// avoid re-reading bytes the dictionary reader is used after this call
if (nextDictionaryReader != null) {
nextDictionaryReader.setRowGroup(currentRowGroup);
return rowGroup;
}

/**
* Reads all the columns requested from the specified row group. It may skip specific pages based on the column
* indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row
* synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
*
* @param blockIndex the index of the requested block
* @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
* @throws IOException if an error occurs while reading
*/
public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason behind you are using a BlockMetaData argument for readRowGroup while you use the index of the row group here? I think, both should work similarly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using BlockMetaData is better as it is similar to the existing getDictionaryReader(BlockMetaData). For the filtered row group, we need the index to access the RowRanges and ColumnIndexStore.

453a6cc#diff-8da24c84aef62e6e836d073938f7843d289785baaeddf446f3afeae6d4ef4b10R983

453a6cc#diff-8da24c84aef62e6e836d073938f7843d289785baaeddf446f3afeae6d4ef4b10R994

What do you think is the better approach?
Accept BlockMetaData and find the index in the list.
-or-
Change the other method signatures to also use indexes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how do you plan to use it. If you don't plan to cache the related metadata outside of the reader I think it is more clean to use the index because it does not suggest that you may use any arbitrary metadata but the ones are in the file. Meanwhile, you may want to select specific row groups to be read based on the metadata so you would already have the related object. In this case it is easier to simply pass it instead of checking for the index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using the index is cleaner. I added methods using int instead of BlockMetaData for existing API.

if (blockIndex < 0 || blockIndex >= blocks.size()) {
return null;
}

advanceToNextBlock();
// Filtering not required -> fall back to the non-filtering path
if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
return internalReadRowGroup(blockIndex);
}

return currentRowGroup;
BlockMetaData block = blocks.get(blockIndex);
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}

RowRanges rowRanges = getRowRanges(blockIndex);
long rowCount = rowRanges.rowCount();
if (rowCount == 0) {
// There are no matching rows -> returning null
return null;
}

if (rowCount == block.getRowCount()) {
// All rows are matching -> fall back to the non-filtering path
return internalReadRowGroup(blockIndex);
}

return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex));
}

/**
Expand All @@ -945,21 +1004,20 @@ public PageReadStore readNextRowGroup() throws IOException {
* details.
*
* @return the PageReadStore which can provide PageReaders for each column
* @throws IOException
* if any I/O error occurs while reading
* @throws IOException if an error occurs while reading
*/
public PageReadStore readNextFilteredRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
return null;
}
// Filtering not required -> fall back to the non-filtering path
if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
return readNextRowGroup();
}
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
ColumnIndexStore ciStore = getColumnIndexStore(currentBlock);
RowRanges rowRanges = getRowRanges(currentBlock);
long rowCount = rowRanges.rowCount();
if (rowCount == 0) {
Expand All @@ -972,9 +1030,22 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
return readNextRowGroup();
}

this.currentRowGroup = new ColumnChunkPageReadStore(rowRanges);
this.currentRowGroup = internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(currentBlock));

// avoid re-reading bytes the dictionary reader is used after this call
if (nextDictionaryReader != null) {
nextDictionaryReader.setRowGroup(currentRowGroup);
}

advanceToNextBlock();

return this.currentRowGroup;
}

private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException {
ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges);
// prepare the list of consecutive parts to read them in one scan
ChunkListBuilder builder = new ChunkListBuilder();
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
Expand Down Expand Up @@ -1005,31 +1076,24 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
consecutiveChunks.readAll(f, builder);
}
for (Chunk chunk : builder.build()) {
readChunkPages(chunk, block);
}

// avoid re-reading bytes the dictionary reader is used after this call
if (nextDictionaryReader != null) {
nextDictionaryReader.setRowGroup(currentRowGroup);
readChunkPages(chunk, block, rowGroup);
}

advanceToNextBlock();

return currentRowGroup;
return rowGroup;
}

private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException {
private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageReadStore rowGroup) throws IOException {
if (null == fileDecryptor || fileDecryptor.plaintextFile()) {
currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
return;
}
// Encrypted file
ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath());
InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(columnPath);
if (!columnDecryptionSetup.isEncrypted()) { // plaintext column
currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
} else { // encrypted column
currentRowGroup.addColumn(chunk.descriptor.col,
rowGroup.addColumn(chunk.descriptor.col,
chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(),
fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal()));
}
Expand Down Expand Up @@ -1080,12 +1144,19 @@ private boolean advanceToNextBlock() {
* @return a DictionaryPageReadStore for the next row group
*/
public DictionaryPageReadStore getNextDictionaryReader() {
if (nextDictionaryReader == null && currentBlock < blocks.size()) {
this.nextDictionaryReader = getDictionaryReader(blocks.get(currentBlock));
if (nextDictionaryReader == null) {
this.nextDictionaryReader = getDictionaryReader(currentBlock);
}
return nextDictionaryReader;
}

public DictionaryPageReader getDictionaryReader(int blockIndex) {
if (blockIndex < 0 || blockIndex >= blocks.size()) {
return null;
}
return new DictionaryPageReader(this, blocks.get(blockIndex));
}

public DictionaryPageReader getDictionaryReader(BlockMetaData block) {
return new DictionaryPageReader(this, block);
}
Expand Down Expand Up @@ -1167,6 +1238,13 @@ private DictionaryPage readCompressedDictionary(
converter.getEncoding(dictHeader.getEncoding()));
}

public BloomFilterReader getBloomFilterDataReader(int blockIndex) {
if (blockIndex < 0 || blockIndex >= blocks.size()) {
return null;
}
return new BloomFilterReader(this, blocks.get(blockIndex));
}

public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) {
return new BloomFilterReader(this, block);
}
Expand Down Expand Up @@ -1315,8 +1393,13 @@ private class ChunkData {

private final Map<ChunkDescriptor, ChunkData> map = new HashMap<>();
private ChunkDescriptor lastDescriptor;
private final long rowCount;
private SeekableInputStream f;

public ChunkListBuilder(long rowCount) {
this.rowCount = rowCount;
}

void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
ChunkData data = map.get(descriptor);
if (data == null) {
Expand Down Expand Up @@ -1345,9 +1428,9 @@ List<Chunk> build() {
ChunkData data = entry.getValue();
if (descriptor.equals(lastDescriptor)) {
// because of a bug, the last chunk might be larger than descriptor.size
chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex));
chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex, rowCount));
} else {
chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex));
chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex, rowCount));
}
}
return chunks;
Expand All @@ -1362,16 +1445,18 @@ private class Chunk {
protected final ChunkDescriptor descriptor;
protected final ByteBufferInputStream stream;
final OffsetIndex offsetIndex;
final long rowCount;

/**
* @param descriptor descriptor for the chunk
* @param buffers ByteBuffers that contain the chunk
* @param offsetIndex the offset index for this column; might be null
*/
public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, OffsetIndex offsetIndex) {
public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, OffsetIndex offsetIndex, long rowCount) {
this.descriptor = descriptor;
this.stream = ByteBufferInputStream.wrap(buffers);
this.offsetIndex = offsetIndex;
this.rowCount = rowCount;
}

protected PageHeader readPageHeader() throws IOException {
Expand Down Expand Up @@ -1518,7 +1603,7 @@ public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecry
}
BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
blocks.get(currentBlock).getRowCount(), pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal);
rowCount, pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal);
}

private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
Expand Down Expand Up @@ -1556,8 +1641,8 @@ private class WorkaroundChunk extends Chunk {
* @param descriptor the descriptor of the chunk
* @param f the file stream positioned at the end of this chunk
*/
private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex) {
super(descriptor, buffers, offsetIndex);
private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex, long rowCount) {
super(descriptor, buffers, offsetIndex, rowCount);
this.f = f;
}

Expand Down
Loading