Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ private static Optional<Collection<Object>> extractDiscreteValues(int domainComp
return Optional.of(valueSet.getDiscreteSet());
}

private FilterPredicate convertToParquetFilter(DateTimeZone timeZone)
public FilterPredicate convertToParquetFilter(DateTimeZone timeZone)
{
FilterPredicate filter = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public ParquetReader(
AggregatedMemoryContext memoryContext,
ParquetReaderOptions options,
Function<Exception, RuntimeException> exceptionTransform,
Optional<TupleDomainParquetPredicate> parquetPredicate,
Optional<FilterPredicate> filter,
List<Optional<ColumnIndexStore>> columnIndexStore,
Optional<ParquetWriteValidation> writeValidation)
throws IOException
Expand Down Expand Up @@ -187,12 +187,8 @@ public ParquetReader(
this.writeChecksumBuilder = writeValidation.map(validation -> createWriteChecksumBuilder(validation.getTypes()));
this.rowGroupStatisticsValidation = writeValidation.map(validation -> createStatisticsValidationBuilder(validation.getTypes()));

requireNonNull(parquetPredicate, "parquetPredicate is null");
requireNonNull(filter, "filter is null");
this.columnIndexStore = requireNonNull(columnIndexStore, "columnIndexStore is null");
Optional<FilterPredicate> filter = Optional.empty();
if (parquetPredicate.isPresent() && options.isUseColumnIndex()) {
filter = parquetPredicate.get().toParquetFilter(timeZone);
}
this.blockRowRanges = calculateFilteredRowRanges(blocks, filter, columnIndexStore, primitiveFields);

this.blockFactory = new ParquetBlockFactory(exceptionTransform);
Expand Down Expand Up @@ -536,6 +532,14 @@ public AggregatedMemoryContext getMemoryContext()
return memoryContext;
}

public static Optional<FilterPredicate> toParquetFilter(Optional<TupleDomainParquetPredicate> parquetPredicate, DateTimeZone timeZone, ParquetReaderOptions options)
{
if (parquetPredicate.isPresent() && options.isUseColumnIndex()) {
return parquetPredicate.get().toParquetFilter(timeZone);
}
Comment on lines +537 to +539
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (parquetPredicate.isPresent() && options.isUseColumnIndex()) {
return parquetPredicate.get().toParquetFilter(timeZone);
}
if (options.isUseColumnIndex()) {
return parquetPredicate.flatMap(predicate -> predicate.toParquetFilter(timeZone));
}

return Optional.empty();
}

private static FilteredRowRanges[] calculateFilteredRowRanges(
List<BlockMetaData> blocks,
Optional<FilterPredicate> filter,
Expand Down
11 changes: 5 additions & 6 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@
<artifactId>trino-hive</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
Expand Down Expand Up @@ -257,12 +262,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@
import com.google.common.math.LongMath;
import io.airlift.log.Logger;
import io.trino.filesystem.TrinoInputFile;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.ParquetWriteValidation;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
Expand All @@ -37,6 +45,7 @@
import io.trino.plugin.hive.HiveColumnHandle.ColumnType;
import io.trino.plugin.hive.HiveColumnProjectionInfo;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.ReaderColumns;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.spi.Page;
Expand All @@ -54,11 +63,20 @@
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import jakarta.annotation.Nullable;
import org.joda.time.DateTimeZone;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -71,7 +89,10 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
Expand All @@ -85,6 +106,11 @@
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
import static io.trino.plugin.hive.parquet.ParquetPageSource.handleException;
import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createParquetPageSource;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
Expand All @@ -98,6 +124,8 @@
import static java.math.RoundingMode.UNNECESSARY;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.joda.time.DateTimeZone.UTC;

public class CheckpointEntryIterator
extends AbstractIterator<DeltaLakeTransactionLogEntry>
Expand Down Expand Up @@ -182,23 +210,139 @@ public CheckpointEntryIterator(
.map(field -> buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle())
.collect(toImmutableList());

TupleDomain<HiveColumnHandle> tupleDomain = columns.size() > 1 ?
TupleDomain.all() :
buildTupleDomainColumnHandle(getOnlyElement(fields), getOnlyElement(columns));

ReaderPageSource pageSource = ParquetPageSourceFactory.createPageSource(
checkpoint,
0,
fileSize,
columns,
tupleDomain,
true,
DateTimeZone.UTC,
stats,
parquetReaderOptions,
Optional.empty(),
domainCompactionThreshold,
OptionalLong.empty());
List<TupleDomain<HiveColumnHandle>> tupleDomains; // OR-ed condition
if (columns.isEmpty()) {
tupleDomains = ImmutableList.of(TupleDomain.all());
}
else {
ImmutableList.Builder<TupleDomain<HiveColumnHandle>> builder = ImmutableList.builder();
int i = 0;
for (EntryType field : fields) {
builder.add(buildTupleDomainColumnHandle(field, columns.get(i++)));
}
tupleDomains = builder.build();
}

ReaderPageSource pageSource;
Optional<ParquetWriteValidation> parquetWriteValidation = Optional.empty();
MessageType fileSchema;
MessageType requestedSchema;
MessageColumnIO messageColumn;
ParquetDataSource dataSource = null;
try {
AggregatedMemoryContext memoryContext = AggregatedMemoryContext.newSimpleAggregatedMemoryContext();
dataSource = ParquetPageSourceFactory.createDataSource(checkpoint, OptionalLong.empty(), parquetReaderOptions, memoryContext, stats);

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, parquetWriteValidation);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();

Optional<MessageType> message = ParquetPageSourceFactory.getParquetMessageType(columns, true, fileSchema);

requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of()));
messageColumn = getColumnIO(fileSchema, requestedSchema);

Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);

List<ParquetDomainAndPredicate> parquetDomains = new ArrayList<>();
FilterPredicate nullableFilterPredicate = null;
for (TupleDomain<HiveColumnHandle> domain : tupleDomains) {
TupleDomain<ColumnDescriptor> parquetTupleDomain = parquetReaderOptions.isIgnoreStatistics()
? TupleDomain.all()
: ParquetPageSourceFactory.getParquetTupleDomain(descriptorsByPath, domain, fileSchema, true);
TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC);
parquetDomains.add(new ParquetDomainAndPredicate(parquetTupleDomain, parquetPredicate));

FilterPredicate filter = parquetPredicate.convertToParquetFilter(UTC);
if (filter != null) {
if (nullableFilterPredicate == null) {
nullableFilterPredicate = filter;
}
else {
nullableFilterPredicate = FilterApi.or(nullableFilterPredicate, filter);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a corresponding test in TestCheckpointEntryIterator

}
}
}

long nextStart = 0;
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
ImmutableList.Builder<Long> blockStarts = ImmutableList.builder();
ImmutableList.Builder<Optional<ColumnIndexStore>> columnIndexes = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
boolean isBlockAdded = false;
for (ParquetDomainAndPredicate domain : parquetDomains) {
if (isBlockAdded) {
break;
}

TupleDomain<ColumnDescriptor> parquetTupleDomain = domain.tupleDomain;
TupleDomainParquetPredicate parquetPredicate = domain.predicate;
Optional<ColumnIndexStore> columnIndex = ParquetPageSourceFactory.getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, parquetReaderOptions);

if ((long) 0 <= firstDataPage && firstDataPage < fileSize &&
predicateMatches(
parquetPredicate,
block,
dataSource,
descriptorsByPath,
parquetTupleDomain,
columnIndex,
Optional.empty(),
UTC,
domainCompactionThreshold)) {
blocks.add(block);
blockStarts.add(nextStart);
columnIndexes.add(columnIndex);
isBlockAdded = true;
}
nextStart += block.getRowCount();
}
}

Optional<ReaderColumns> readerProjections = projectBaseColumns(columns, true);
List<HiveColumnHandle> baseColumns = readerProjections.map(projection ->
projection.get().stream()
.map(HiveColumnHandle.class::cast)
.collect(toUnmodifiableList()))
.orElse(columns);

Optional<FilterPredicate> filterPredicate = Optional.ofNullable(nullableFilterPredicate);
ParquetDataSourceId dataSourceId = dataSource.getId();
ParquetDataSource finalDataSource = dataSource;
ParquetPageSourceFactory.ParquetReaderProvider parquetReaderProvider = columnFields -> new ParquetReader(
Optional.ofNullable(fileMetaData.getCreatedBy()),
columnFields,
blocks.build(),
blockStarts.build(),
finalDataSource,
UTC,
memoryContext,
parquetReaderOptions,
exception -> handleException(dataSourceId, exception),
filterPredicate,
columnIndexes.build(),
parquetWriteValidation);
ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, true, parquetReaderProvider);
pageSource = new ReaderPageSource(parquetPageSource, readerProjections);
}
catch (Exception e) {
try {
if (dataSource != null) {
dataSource.close();
}
}
catch (IOException ignored) {
}
if (e instanceof TrinoException) {
throw (TrinoException) e;
}
if (e instanceof ParquetCorruptionException) {
throw new TrinoException(HIVE_BAD_DATA, e);
}
String message = format("Error opening Hive split %s (offset=%s, length=%s): %s", checkpoint.location(), (long) 0, fileSize, e.getMessage());
throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, message, e);
}

verify(pageSource.getReaderColumns().isEmpty(), "All columns expected to be base columns");

Expand All @@ -209,6 +353,8 @@ public CheckpointEntryIterator(
.collect(toImmutableList());
}

private record ParquetDomainAndPredicate(TupleDomain<ColumnDescriptor> tupleDomain, TupleDomainParquetPredicate predicate) {}

private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointSchemaManager schemaManager, MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
Type type = switch (entryType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private Page getColumnAdaptationsPage(Page page)
return new Page(batchSize, blocks);
}

static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception)
public static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception)
{
if (exception instanceof TrinoException) {
return (TrinoException) exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
import static io.trino.parquet.reader.ParquetReader.toParquetFilter;
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
Expand Down Expand Up @@ -289,7 +290,7 @@ && predicateMatches(
memoryContext,
options,
exception -> handleException(dataSourceId, exception),
Optional.of(parquetPredicate),
toParquetFilter(Optional.of(parquetPredicate), timeZone, options),
columnIndexes.build(),
parquetWriteValidation);
ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
import static io.trino.parquet.reader.ParquetReader.toParquetFilter;
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.ParquetReaderProvider;
import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource;
Expand Down Expand Up @@ -248,7 +249,7 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
memoryContext,
options,
exception -> handleException(dataSourceId, exception),
Optional.of(parquetPredicate),
toParquetFilter(Optional.of(parquetPredicate), timeZone, options),
columnIndexes.build(),
Optional.empty());
return createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);
Expand Down