Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3258,22 +3258,20 @@ public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle ta

public static TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate(
AddFileEntry addFileEntry,
List<DeltaLakeColumnMetadata> schema,
List<String> canonicalPartitionColumns)
List<DeltaLakeColumnHandle> schema)
{
return addFileEntry.getStats()
.map(deltaLakeFileStatistics -> withColumnDomains(
schema.stream()
.filter(column -> canUseInPredicate(column.getColumnMetadata()))
.filter(column -> canUseInPredicate(column.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getType).orElse(column.getBaseType())))
.collect(toImmutableMap(
column -> DeltaLakeMetadata.toColumnHandle(column.getName(), column.getType(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), canonicalPartitionColumns),
column -> buildColumnDomain(column, deltaLakeFileStatistics, canonicalPartitionColumns)))))
Function.identity(),
column -> buildColumnDomain(column, deltaLakeFileStatistics)))))
.orElseGet(TupleDomain::all);
}

private static boolean canUseInPredicate(ColumnMetadata column)
private static boolean canUseInPredicate(Type type)
{
Type type = column.getType();
return type.equals(TINYINT)
|| type.equals(SMALLINT)
|| type.equals(INTEGER)
Expand All @@ -3287,49 +3285,49 @@ private static boolean canUseInPredicate(ColumnMetadata column)
|| type.equals(VARCHAR);
}

private static Domain buildColumnDomain(DeltaLakeColumnMetadata column, DeltaLakeFileStatistics stats, List<String> canonicalPartitionColumns)
private static Domain buildColumnDomain(DeltaLakeColumnHandle column, DeltaLakeFileStatistics stats)
{
Optional<Long> nullCount = stats.getNullCount(column.getPhysicalName());
Type type = column.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getType).orElse(column.getBaseType());
Optional<Long> nullCount = stats.getNullCount(column);
if (nullCount.isEmpty()) {
// No stats were collected for this column; this can happen in 2 scenarios:
// 1. The column didn't exist in the schema when the data file was created
// 2. The column does exist in the file, but Spark property 'delta.dataSkippingNumIndexedCols'
// was used to limit the number of columns for which stats are collected
// Since we don't know which scenario we're dealing with, we can't make a decision to prune.
return Domain.all(column.getType());
return Domain.all(type);
}
if (stats.getNumRecords().equals(nullCount)) {
return Domain.onlyNull(column.getType());
return Domain.onlyNull(type);
}

boolean hasNulls = nullCount.get() > 0;
DeltaLakeColumnHandle deltaLakeColumnHandle = toColumnHandle(column.getName(), column.getType(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), canonicalPartitionColumns);
Optional<Object> minValue = stats.getMinColumnValue(deltaLakeColumnHandle);
if (minValue.isPresent() && isFloatingPointNaN(column.getType(), minValue.get())) {
return allValues(column.getType(), hasNulls);
Optional<Object> minValue = stats.getMinColumnValue(column);
if (minValue.isPresent() && isFloatingPointNaN(type, minValue.get())) {
return allValues(type, hasNulls);
}
if (isNotFinite(minValue, column.getType())) {
if (isNotFinite(minValue, type)) {
minValue = Optional.empty();
}
Optional<Object> maxValue = stats.getMaxColumnValue(deltaLakeColumnHandle);
if (maxValue.isPresent() && isFloatingPointNaN(column.getType(), maxValue.get())) {
return allValues(column.getType(), hasNulls);
Optional<Object> maxValue = stats.getMaxColumnValue(column);
if (maxValue.isPresent() && isFloatingPointNaN(type, maxValue.get())) {
return allValues(type, hasNulls);
}
if (isNotFinite(maxValue, column.getType())) {
if (isNotFinite(maxValue, type)) {
maxValue = Optional.empty();
}
if (minValue.isPresent() && maxValue.isPresent()) {
return Domain.create(
ofRanges(range(column.getType(), minValue.get(), true, maxValue.get(), true)),
ofRanges(range(type, minValue.get(), true, maxValue.get(), true)),
hasNulls);
}
if (minValue.isPresent()) {
return Domain.create(ofRanges(greaterThanOrEqual(column.getType(), minValue.get())), hasNulls);
return Domain.create(ofRanges(greaterThanOrEqual(type, minValue.get())), hasNulls);
}

return maxValue
.map(value -> Domain.create(ofRanges(lessThanOrEqual(column.getType(), value)), hasNulls))
.orElseGet(() -> Domain.all(column.getType()));
.map(value -> Domain.create(ofRanges(lessThanOrEqual(type, value)), hasNulls))
.orElseGet(() -> Domain.all(type));
}

private static boolean isNotFinite(Optional<Object> value, Type type)
Expand Down Expand Up @@ -3359,7 +3357,7 @@ private static Domain allValues(Type type, boolean includeNull)
return Domain.notNull(type);
}

private static DeltaLakeColumnHandle toColumnHandle(String originalName, Type type, OptionalInt fieldId, String physicalName, Type physicalType, Collection<String> partitionColumns)
public static DeltaLakeColumnHandle toColumnHandle(String originalName, Type type, OptionalInt fieldId, String physicalName, Type physicalType, Collection<String> partitionColumns)
{
boolean isPartitionKey = partitionColumns.stream().anyMatch(partition -> partition.equalsIgnoreCase(originalName));
return new DeltaLakeColumnHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.toColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize;
Expand Down Expand Up @@ -169,13 +170,14 @@ private Stream<DeltaLakeSplit> getSplits(

Set<String> predicatedColumnNames = Stream.concat(
nonPartitionConstraint.getDomains().orElseThrow().keySet().stream(),
columnsCoveredByDynamicFilter.stream()
.map(DeltaLakeColumnHandle.class::cast))
columnsCoveredByDynamicFilter.stream()
.map(DeltaLakeColumnHandle.class::cast))
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), typeManager);
List<DeltaLakeColumnMetadata> predicatedColumns = schema.stream()
.filter(column -> predicatedColumnNames.contains(column.getName()))
List<DeltaLakeColumnHandle> predicatedColumns = schema.stream()
.filter(column -> predicatedColumnNames.contains(column.getName())) // DeltaLakeColumnMetadata.name is lowercase
.map(column -> toColumnHandle(column.getName(), column.getType(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), tableHandle.getMetadataEntry().getOriginalPartitionColumns()))
.collect(toImmutableList());
return validDataFiles.stream()
.flatMap(addAction -> {
Expand Down Expand Up @@ -205,8 +207,7 @@ private Stream<DeltaLakeSplit> getSplits(

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addAction,
predicatedColumns,
tableHandle.getMetadataEntry().getLowercasePartitionColumns());
predicatedColumns);
if (!nonPartitionConstraint.overlaps(statisticsPredicate)) {
return Stream.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@
import java.util.function.Function;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.flattenNestedKeyMap;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.hasInvalidStatistics;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonEncodeMax;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonEncodeMin;
Expand Down Expand Up @@ -202,6 +203,15 @@ public DataFileInfo getDataFileInfo()
private static DeltaLakeJsonFileStatistics readStatistics(TrinoInputFile inputFile, Map</* lowercase */ String, Type> typeForColumn, long rowCount)
throws IOException
{
ImmutableMap.Builder<String, Type> typeForProjectedColumnBuilder = ImmutableMap.builder();
for (Map.Entry<String, Type> column : typeForColumn.entrySet()) {
String columnName = column.getKey();
Type columnType = column.getValue();
populateType(columnName, columnType, typeForProjectedColumnBuilder);
}

Map<String, Type> typeForProjectedColumn = typeForProjectedColumnBuilder.buildOrThrow();

try (TrinoParquetDataSource trinoParquetDataSource = new TrinoParquetDataSource(
inputFile,
new ParquetReaderOptions(),
Expand All @@ -211,15 +221,15 @@ private static DeltaLakeJsonFileStatistics readStatistics(TrinoInputFile inputFi
ImmutableMultimap.Builder<String, ColumnChunkMetaData> metadataForColumn = ImmutableMultimap.builder();
for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) {
for (ColumnChunkMetaData columnChunkMetaData : blockMetaData.getColumns()) {
if (columnChunkMetaData.getPath().size() != 1) {
continue; // Only base column stats are supported
checkArgument(columnChunkMetaData.getPath().size() > 0, "columnChunkMetaData path must not be empty");
String columnName = columnChunkMetaData.getPath().toDotString();
if (typeForProjectedColumn.containsKey(columnName)) {
metadataForColumn.put(columnName, columnChunkMetaData);
}
String columnName = getOnlyElement(columnChunkMetaData.getPath());
metadataForColumn.put(columnName, columnChunkMetaData);
}
}

return mergeStats(metadataForColumn.build(), typeForColumn, rowCount);
return mergeStats(metadataForColumn.build(), typeForProjectedColumn, rowCount);
}
}

Expand All @@ -229,15 +239,18 @@ static DeltaLakeJsonFileStatistics mergeStats(Multimap<String, ColumnChunkMetaDa
Map<String, Optional<Statistics<?>>> statsForColumn = metadataForColumn.keySet().stream()
.collect(toImmutableMap(identity(), key -> mergeMetadataList(metadataForColumn.get(key))));

Map<String, Object> nullCount = statsForColumn.entrySet().stream()
Map<String, Optional<Object>> nullCount = statsForColumn.entrySet().stream()
.filter(entry -> entry.getValue().isPresent())
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get().getNumNulls()));
.collect(toImmutableMap(Map.Entry::getKey, entry -> Optional.of(entry.getValue().get().getNumNulls())));

// TODO Databricks collect nullCount stats for Map and Array type as well, whereas trino does not collect
Map<String, Object> flattenedNullCount = flattenNestedKeyMap(nullCount);

return new DeltaLakeJsonFileStatistics(
Optional.of(rowCount),
Optional.of(jsonEncodeMin(statsForColumn, typeForColumn)),
Optional.of(jsonEncodeMax(statsForColumn, typeForColumn)),
Optional.of(nullCount));
Optional.of(flattenedNullCount));
}

private static Optional<Statistics<?>> mergeMetadataList(Collection<ColumnChunkMetaData> metadataList)
Expand All @@ -254,6 +267,20 @@ private static Optional<Statistics<?>> mergeMetadataList(Collection<ColumnChunkM
});
}

private static void populateType(String name, Type type, ImmutableMap.Builder<String, Type> typeForProjectedColumn)
{
if (type instanceof RowType rowType) {
List<RowType.Field> fields = rowType.getFields();
for (RowType.Field field : fields) {
String projectedName = name + "." + field.getName().orElseThrow();
populateType(projectedName, field.getType(), typeForProjectedColumn);
}
}
else {
typeForProjectedColumn.put(name, type);
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.toColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate;
import static io.trino.spi.statistics.StatsUtil.toStatsRepresentation;
Expand Down Expand Up @@ -102,12 +103,11 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
}

Set<String> predicatedColumnNames = tableHandle.getNonPartitionConstraint().getDomains().orElseThrow().keySet().stream()
// TODO Statistics for column inside complex type is not collected (https://github.com/trinodb/trino/issues/17164)
.filter(DeltaLakeColumnHandle::isBaseColumn)
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
List<DeltaLakeColumnMetadata> predicatedColumns = columnMetadata.stream()
List<DeltaLakeColumnHandle> predicatedColumns = columnMetadata.stream()
.filter(column -> predicatedColumnNames.contains(column.getName()))
.map(column -> toColumnHandle(column.getName(), column.getType(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), tableHandle.getMetadataEntry().getOriginalPartitionColumns()))
.collect(toImmutableList());

for (AddFileEntry addEntry : transactionLogAccess.getActiveFiles(tableSnapshot, session)) {
Expand All @@ -123,8 +123,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addEntry,
predicatedColumns,
tableHandle.getMetadataEntry().getLowercasePartitionColumns());
predicatedColumns);
if (!tableHandle.getNonPartitionConstraint().overlaps(statisticsPredicate)) {
continue;
}
Expand All @@ -148,7 +147,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
}
}
else {
Optional<Long> maybeNullCount = column.isBaseColumn() ? stats.getNullCount(column.getBasePhysicalColumnName()) : Optional.empty();
Optional<Long> maybeNullCount = stats.getNullCount(column);
if (maybeNullCount.isPresent()) {
nullCounts.put(column, nullCounts.get(column) + maybeNullCount.get());
}
Expand Down
Loading