Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2949,22 +2949,20 @@ public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle ta

public static TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate(
AddFileEntry addFileEntry,
List<DeltaLakeColumnMetadata> schema,
List<String> canonicalPartitionColumns)
List<DeltaLakeColumnHandle> schema)
{
return addFileEntry.getStats()
.map(deltaLakeFileStatistics -> withColumnDomains(
schema.stream()
.filter(column -> canUseInPredicate(column.getColumnMetadata()))
.filter(column -> canUseInPredicate(column.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getType).orElse(column.getBaseType())))
.collect(toImmutableMap(
column -> DeltaLakeMetadata.toColumnHandle(column.getColumnMetadata(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), canonicalPartitionColumns),
column -> buildColumnDomain(column, deltaLakeFileStatistics, canonicalPartitionColumns)))))
Function.identity(),
column -> buildColumnDomain(column, deltaLakeFileStatistics)))))
.orElseGet(TupleDomain::all);
}

private static boolean canUseInPredicate(ColumnMetadata column)
private static boolean canUseInPredicate(Type type)
{
Type type = column.getType();
return type.equals(TINYINT)
|| type.equals(SMALLINT)
|| type.equals(INTEGER)
Expand All @@ -2978,49 +2976,49 @@ private static boolean canUseInPredicate(ColumnMetadata column)
|| type.equals(VARCHAR);
}

private static Domain buildColumnDomain(DeltaLakeColumnMetadata column, DeltaLakeFileStatistics stats, List<String> canonicalPartitionColumns)
private static Domain buildColumnDomain(DeltaLakeColumnHandle column, DeltaLakeFileStatistics stats)
{
Optional<Long> nullCount = stats.getNullCount(column.getPhysicalName());
Type type = column.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getType).orElse(column.getBaseType());
Optional<Long> nullCount = stats.getNullCount(column);
if (nullCount.isEmpty()) {
// No stats were collected for this column; this can happen in 2 scenarios:
// 1. The column didn't exist in the schema when the data file was created
// 2. The column does exist in the file, but Spark property 'delta.dataSkippingNumIndexedCols'
// was used to limit the number of columns for which stats are collected
// Since we don't know which scenario we're dealing with, we can't make a decision to prune.
return Domain.all(column.getType());
return Domain.all(type);
}
if (stats.getNumRecords().equals(nullCount)) {
return Domain.onlyNull(column.getType());
return Domain.onlyNull(type);
}

boolean hasNulls = nullCount.get() > 0;
DeltaLakeColumnHandle deltaLakeColumnHandle = toColumnHandle(column.getColumnMetadata(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), canonicalPartitionColumns);
Optional<Object> minValue = stats.getMinColumnValue(deltaLakeColumnHandle);
if (minValue.isPresent() && isFloatingPointNaN(column.getType(), minValue.get())) {
return allValues(column.getType(), hasNulls);
Optional<Object> minValue = stats.getMinColumnValue(column);
if (minValue.isPresent() && isFloatingPointNaN(type, minValue.get())) {
return allValues(type, hasNulls);
}
if (isNotFinite(minValue, column.getType())) {
if (isNotFinite(minValue, type)) {
minValue = Optional.empty();
}
Optional<Object> maxValue = stats.getMaxColumnValue(deltaLakeColumnHandle);
if (maxValue.isPresent() && isFloatingPointNaN(column.getType(), maxValue.get())) {
return allValues(column.getType(), hasNulls);
Optional<Object> maxValue = stats.getMaxColumnValue(column);
if (maxValue.isPresent() && isFloatingPointNaN(type, maxValue.get())) {
return allValues(type, hasNulls);
}
if (isNotFinite(maxValue, column.getType())) {
if (isNotFinite(maxValue, type)) {
maxValue = Optional.empty();
}
if (minValue.isPresent() && maxValue.isPresent()) {
return Domain.create(
ofRanges(range(column.getType(), minValue.get(), true, maxValue.get(), true)),
ofRanges(range(type, minValue.get(), true, maxValue.get(), true)),
hasNulls);
}
if (minValue.isPresent()) {
return Domain.create(ofRanges(greaterThanOrEqual(column.getType(), minValue.get())), hasNulls);
return Domain.create(ofRanges(greaterThanOrEqual(type, minValue.get())), hasNulls);
}

return maxValue
.map(value -> Domain.create(ofRanges(lessThanOrEqual(column.getType(), value)), hasNulls))
.orElseGet(() -> Domain.all(column.getType()));
.map(value -> Domain.create(ofRanges(lessThanOrEqual(type, value)), hasNulls))
.orElseGet(() -> Domain.all(type));
}

private static boolean isNotFinite(Optional<Object> value, Type type)
Expand Down Expand Up @@ -3055,7 +3053,7 @@ private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, Strin
return toColumnHandle(column, OptionalInt.empty(), physicalName, physicalType, partitionColumns);
}

private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, OptionalInt fieldId, String physicalName, Type physicalType, Collection<String> partitionColumns)
public static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, OptionalInt fieldId, String physicalName, Type physicalType, Collection<String> partitionColumns)
{
boolean isPartitionKey = partitionColumns.stream().anyMatch(partition -> partition.equalsIgnoreCase(column.getName()));
return new DeltaLakeColumnHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
Expand All @@ -37,6 +38,8 @@
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;

import javax.inject.Inject;
Expand All @@ -51,11 +54,11 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
Expand Down Expand Up @@ -167,15 +170,20 @@ private Stream<DeltaLakeSplit> getSplits(
Optional<Instant> filesModifiedAfter = tableHandle.getAnalyzeHandle().flatMap(AnalyzeHandle::getFilesModifiedAfter);
Optional<Long> maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes);

Set<String> predicatedColumnNames = Stream.concat(
Map<String, DeltaLakeColumnHandle> predicatedColumnHandles = Stream.concat(
nonPartitionConstraint.getDomains().orElseThrow().keySet().stream(),
columnsCoveredByDynamicFilter.stream()
.map(DeltaLakeColumnHandle.class::cast))
.map(column -> column.getBaseColumnName().toLowerCase(ENGLISH)) // TODO is DeltaLakeColumnHandle.name normalized?
.collect(toImmutableSet());
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), typeManager);
List<DeltaLakeColumnMetadata> predicatedColumns = schema.stream()
.filter(column -> predicatedColumnNames.contains(column.getName())) // DeltaLakeColumnMetadata.name is lowercase
.distinct()
// DeltaLakeColumnMetadata.name is lowercase
.collect(toImmutableMap(column -> column.getQualifiedPhysicalName().toLowerCase(ENGLISH), Function.identity())); // TODO is DeltaLakeColumnHandle.name normalized?

Set<String> predicatedColumnNames = predicatedColumnHandles.keySet();
Set<String> projectedColumnNames = projectColumnNames(extractSchema(tableHandle.getMetadataEntry(), typeManager));

List<DeltaLakeColumnHandle> predicatedColumns = projectedColumnNames.stream()
.filter(column -> predicatedColumnNames.contains(column))
.map(column -> predicatedColumnHandles.get(column))
.collect(toImmutableList());

return validDataFiles.stream()
Expand Down Expand Up @@ -205,8 +213,7 @@ private Stream<DeltaLakeSplit> getSplits(

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addAction,
predicatedColumns,
tableHandle.getMetadataEntry().getCanonicalPartitionColumns());
predicatedColumns);
if (!nonPartitionConstraint.overlaps(statisticsPredicate)) {
return Stream.empty();
}
Expand Down Expand Up @@ -236,6 +243,27 @@ private Stream<DeltaLakeSplit> getSplits(
});
}

private Set<String> projectColumnNames(List<DeltaLakeColumnMetadata> schema)
{
ImmutableSet.Builder<String> projectedColumnNames = ImmutableSet.builder();
for (DeltaLakeColumnMetadata column : schema) {
projectColumns(column.getPhysicalName(), column.getPhysicalColumnType(), projectedColumnNames);
}
return projectedColumnNames.build();
}

private void projectColumns(String name, Type type, ImmutableSet.Builder<String> projectedColumnNames)
{
if (type instanceof RowType rowType) {
for (RowType.Field field : rowType.getFields()) {
projectColumns(name + "#" + field.getName().orElseThrow(), field.getType(), projectedColumnNames);
}
}
else {
projectedColumnNames.add(name);
}
}

private static boolean mayAnyDataColumnProjected(DeltaLakeTableHandle tableHandle)
{
if (tableHandle.getProjectedColumns().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@
import java.util.function.Function;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.convertNestedMapKeys;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.hasInvalidStatistics;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonEncodeMax;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonEncodeMin;
Expand Down Expand Up @@ -200,11 +201,15 @@ public DataFileInfo getDataFileInfo()
private static DeltaLakeJsonFileStatistics readStatistics(TrinoInputFile inputFile, List<String> dataColumnNames, List<Type> dataColumnTypes, long rowCount)
throws IOException
{
ImmutableMap.Builder<String, Type> typeForColumn = ImmutableMap.builder();
ImmutableMap.Builder<String, Type> typeForProjectedColumnBuilder = ImmutableMap.builder();
for (int i = 0; i < dataColumnNames.size(); i++) {
typeForColumn.put(dataColumnNames.get(i), dataColumnTypes.get(i));
String columnName = dataColumnNames.get(i);
Type columnType = dataColumnTypes.get(i);
populateType(columnName, columnType, typeForProjectedColumnBuilder);
}

Map<String, Type> typeForProjectedColumn = typeForProjectedColumnBuilder.buildOrThrow();

try (TrinoParquetDataSource trinoParquetDataSource = new TrinoParquetDataSource(
inputFile,
new ParquetReaderOptions(),
Expand All @@ -214,15 +219,15 @@ private static DeltaLakeJsonFileStatistics readStatistics(TrinoInputFile inputFi
ImmutableMultimap.Builder<String, ColumnChunkMetaData> metadataForColumn = ImmutableMultimap.builder();
for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) {
for (ColumnChunkMetaData columnChunkMetaData : blockMetaData.getColumns()) {
if (columnChunkMetaData.getPath().size() != 1) {
continue; // Only base column stats are supported
checkArgument(columnChunkMetaData.getPath().size() > 0, "columnChunkMetaData path must not be empty");
String columnName = columnChunkMetaData.getPath().toDotString();
if (typeForProjectedColumn.containsKey(columnName)) {
metadataForColumn.put(columnName, columnChunkMetaData);
}
String columnName = getOnlyElement(columnChunkMetaData.getPath());
metadataForColumn.put(columnName, columnChunkMetaData);
}
}

return mergeStats(metadataForColumn.build(), typeForColumn.buildOrThrow(), rowCount);
return mergeStats(metadataForColumn.build(), typeForProjectedColumn, rowCount);
}
}

Expand All @@ -232,15 +237,18 @@ static DeltaLakeJsonFileStatistics mergeStats(Multimap<String, ColumnChunkMetaDa
Map<String, Optional<Statistics<?>>> statsForColumn = metadataForColumn.keySet().stream()
.collect(toImmutableMap(identity(), key -> mergeMetadataList(metadataForColumn.get(key))));

Map<String, Object> nullCount = statsForColumn.entrySet().stream()
Map<String, Optional<Object>> nullCount = statsForColumn.entrySet().stream()
.filter(entry -> entry.getValue().isPresent())
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get().getNumNulls()));
.collect(toImmutableMap(Map.Entry::getKey, entry -> Optional.of(entry.getValue().get().getNumNulls())));

// TODO Databricks collect nullCount stats for Map and Array type as well, whereas trino does not collect
Map<String, Object> convertedNullCount = convertNestedMapKeys(nullCount);

return new DeltaLakeJsonFileStatistics(
Optional.of(rowCount),
Optional.of(jsonEncodeMin(statsForColumn, typeForColumn)),
Optional.of(jsonEncodeMax(statsForColumn, typeForColumn)),
Optional.of(nullCount));
Optional.of(convertedNullCount));
}

private static Optional<Statistics<?>> mergeMetadataList(Collection<ColumnChunkMetaData> metadataList)
Expand All @@ -257,6 +265,20 @@ private static Optional<Statistics<?>> mergeMetadataList(Collection<ColumnChunkM
});
}

private static void populateType(String name, Type type, ImmutableMap.Builder<String, Type> typeForProjectedColumn)
{
if (type instanceof RowType rowType) {
List<RowType.Field> fields = rowType.getFields();
for (RowType.Field field : fields) {
String projectedName = name + "." + field.getName().orElseThrow();
populateType(projectedName, field.getType(), typeForProjectedColumn);
}
}
else {
typeForProjectedColumn.put(name, type);
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.toColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate;
import static io.trino.spi.statistics.StatsUtil.toStatsRepresentation;
Expand Down Expand Up @@ -105,8 +106,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
Set<String> predicatedColumnNames = tableHandle.getNonPartitionConstraint().getDomains().orElseThrow().keySet().stream()
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
List<DeltaLakeColumnMetadata> predicatedColumns = columnMetadata.stream()
List<DeltaLakeColumnHandle> predicatedColumns = columnMetadata.stream()
.filter(column -> predicatedColumnNames.contains(column.getName()))
.map(column -> toColumnHandle(column.getColumnMetadata(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), tableHandle.getMetadataEntry().getCanonicalPartitionColumns()))
.collect(toImmutableList());

for (AddFileEntry addEntry : transactionLogAccess.getActiveFiles(tableSnapshot, session)) {
Expand All @@ -122,8 +124,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addEntry,
predicatedColumns,
tableHandle.getMetadataEntry().getCanonicalPartitionColumns());
predicatedColumns);
if (!tableHandle.getNonPartitionConstraint().overlaps(statisticsPredicate)) {
continue;
}
Expand All @@ -147,7 +148,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
}
}
else {
Optional<Long> maybeNullCount = column.isBaseColumn() ? stats.getNullCount(column.getBasePhysicalColumnName()) : Optional.empty();
Optional<Long> maybeNullCount = column.isBaseColumn() ? stats.getNullCount(column) : Optional.empty();
if (maybeNullCount.isPresent()) {
nullCounts.put(column, nullCounts.get(column) + maybeNullCount.get());
}
Expand Down
Loading