Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static java.lang.Math.max;
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;

public class ParquetReader
Expand Down Expand Up @@ -130,10 +131,11 @@ public ParquetReader(
ParquetDataSource dataSource,
DateTimeZone timeZone,
AggregatedMemoryContext memoryContext,
ParquetReaderOptions options)
ParquetReaderOptions options,
Optional<Predicate> parquetPredicate)
throws IOException
{
this(fileCreatedBy, fields, blocks, firstRowsOfBlocks, dataSource, timeZone, memoryContext, options, null, null);
this(fileCreatedBy, fields, blocks, firstRowsOfBlocks, dataSource, timeZone, memoryContext, options, parquetPredicate, nCopies(blocks.size(), Optional.empty()));
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
}

public ParquetReader(
Expand All @@ -145,7 +147,7 @@ public ParquetReader(
DateTimeZone timeZone,
AggregatedMemoryContext memoryContext,
ParquetReaderOptions options,
Predicate parquetPredicate,
Optional<Predicate> parquetPredicate,
List<Optional<ColumnIndexStore>> columnIndexStore)
throws IOException
{
Expand All @@ -164,14 +166,16 @@ public ParquetReader(

checkArgument(blocks.size() == firstRowsOfBlocks.size(), "elements of firstRowsOfBlocks must correspond to blocks");

this.columnIndexStore = columnIndexStore;
this.blockRowRanges = listWithNulls(this.blocks.size());
for (PrimitiveField field : primitiveFields) {
ColumnDescriptor columnDescriptor = field.getDescriptor();
this.paths.put(ColumnPath.get(columnDescriptor.getPath()), columnDescriptor);
}
if (parquetPredicate != null && options.isUseColumnIndex()) {
this.filter = parquetPredicate.toParquetFilter(timeZone);

requireNonNull(parquetPredicate, "parquetPredicate is null");
this.columnIndexStore = requireNonNull(columnIndexStore, "columnIndexStore is null");
if (parquetPredicate.isPresent() && options.isUseColumnIndex()) {
this.filter = parquetPredicate.get().toParquetFilter(timeZone);
}
else {
this.filter = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
timeZone,
newSimpleAggregatedMemoryContext(),
options,
parquetPredicate,
Optional.of(parquetPredicate),
columnIndexes.build());

ConnectorPageSource parquetPageSource = new ParquetPageSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,8 @@ else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
dataSource,
UTC,
memoryContext,
options);
options,
Optional.empty());

return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
Expand Down