diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index dee4205f7396..d94ad3742672 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -2949,22 +2949,20 @@ public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle ta public static TupleDomain createStatisticsPredicate( AddFileEntry addFileEntry, - List schema, - List canonicalPartitionColumns) + List schema) { return addFileEntry.getStats() .map(deltaLakeFileStatistics -> withColumnDomains( schema.stream() - .filter(column -> canUseInPredicate(column.getColumnMetadata())) + .filter(column -> canUseInPredicate(column.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getType).orElse(column.getBaseType()))) .collect(toImmutableMap( - column -> DeltaLakeMetadata.toColumnHandle(column.getColumnMetadata(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), canonicalPartitionColumns), - column -> buildColumnDomain(column, deltaLakeFileStatistics, canonicalPartitionColumns))))) + Function.identity(), + column -> buildColumnDomain(column, deltaLakeFileStatistics))))) .orElseGet(TupleDomain::all); } - private static boolean canUseInPredicate(ColumnMetadata column) + private static boolean canUseInPredicate(Type type) { - Type type = column.getType(); return type.equals(TINYINT) || type.equals(SMALLINT) || type.equals(INTEGER) @@ -2978,49 +2976,49 @@ private static boolean canUseInPredicate(ColumnMetadata column) || type.equals(VARCHAR); } - private static Domain buildColumnDomain(DeltaLakeColumnMetadata column, DeltaLakeFileStatistics stats, List canonicalPartitionColumns) + private static Domain buildColumnDomain(DeltaLakeColumnHandle column, DeltaLakeFileStatistics stats) { - Optional nullCount = stats.getNullCount(column.getPhysicalName()); + Type type = column.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getType).orElse(column.getBaseType()); + Optional nullCount = stats.getNullCount(column); if (nullCount.isEmpty()) { // No stats were collected for this column; this can happen in 2 scenarios: // 1. The column didn't exist in the schema when the data file was created // 2. The column does exist in the file, but Spark property 'delta.dataSkippingNumIndexedCols' // was used to limit the number of columns for which stats are collected // Since we don't know which scenario we're dealing with, we can't make a decision to prune. - return Domain.all(column.getType()); + return Domain.all(type); } if (stats.getNumRecords().equals(nullCount)) { - return Domain.onlyNull(column.getType()); + return Domain.onlyNull(type); } boolean hasNulls = nullCount.get() > 0; - DeltaLakeColumnHandle deltaLakeColumnHandle = toColumnHandle(column.getColumnMetadata(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), canonicalPartitionColumns); - Optional minValue = stats.getMinColumnValue(deltaLakeColumnHandle); - if (minValue.isPresent() && isFloatingPointNaN(column.getType(), minValue.get())) { - return allValues(column.getType(), hasNulls); + Optional minValue = stats.getMinColumnValue(column); + if (minValue.isPresent() && isFloatingPointNaN(type, minValue.get())) { + return allValues(type, hasNulls); } - if (isNotFinite(minValue, column.getType())) { + if (isNotFinite(minValue, type)) { minValue = Optional.empty(); } - Optional maxValue = stats.getMaxColumnValue(deltaLakeColumnHandle); - if (maxValue.isPresent() && isFloatingPointNaN(column.getType(), maxValue.get())) { - return allValues(column.getType(), hasNulls); + Optional maxValue = stats.getMaxColumnValue(column); + if (maxValue.isPresent() && isFloatingPointNaN(type, maxValue.get())) { + return allValues(type, hasNulls); } - if (isNotFinite(maxValue, column.getType())) { + if (isNotFinite(maxValue, type)) { maxValue = Optional.empty(); } if (minValue.isPresent() && maxValue.isPresent()) { return Domain.create( - ofRanges(range(column.getType(), minValue.get(), true, maxValue.get(), true)), + ofRanges(range(type, minValue.get(), true, maxValue.get(), true)), hasNulls); } if (minValue.isPresent()) { - return Domain.create(ofRanges(greaterThanOrEqual(column.getType(), minValue.get())), hasNulls); + return Domain.create(ofRanges(greaterThanOrEqual(type, minValue.get())), hasNulls); } return maxValue - .map(value -> Domain.create(ofRanges(lessThanOrEqual(column.getType(), value)), hasNulls)) - .orElseGet(() -> Domain.all(column.getType())); + .map(value -> Domain.create(ofRanges(lessThanOrEqual(type, value)), hasNulls)) + .orElseGet(() -> Domain.all(type)); } private static boolean isNotFinite(Optional value, Type type) @@ -3055,7 +3053,7 @@ private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, Strin return toColumnHandle(column, OptionalInt.empty(), physicalName, physicalType, partitionColumns); } - private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, OptionalInt fieldId, String physicalName, Type physicalType, Collection partitionColumns) + public static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, OptionalInt fieldId, String physicalName, Type physicalType, Collection partitionColumns) { boolean isPartitionKey = partitionColumns.stream().anyMatch(partition -> partition.equalsIgnoreCase(column.getName())); return new DeltaLakeColumnHandle( diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 77cbcd0260ce..a4ceb73169f3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; @@ -37,6 +38,8 @@ import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.ptf.ConnectorTableFunctionHandle; +import io.trino.spi.type.RowType; +import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import javax.inject.Inject; @@ -51,11 +54,11 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; @@ -167,15 +170,20 @@ private Stream getSplits( Optional filesModifiedAfter = tableHandle.getAnalyzeHandle().flatMap(AnalyzeHandle::getFilesModifiedAfter); Optional maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes); - Set predicatedColumnNames = Stream.concat( + Map predicatedColumnHandles = Stream.concat( nonPartitionConstraint.getDomains().orElseThrow().keySet().stream(), columnsCoveredByDynamicFilter.stream() .map(DeltaLakeColumnHandle.class::cast)) - .map(column -> column.getBaseColumnName().toLowerCase(ENGLISH)) // TODO is DeltaLakeColumnHandle.name normalized? - .collect(toImmutableSet()); - List schema = extractSchema(tableHandle.getMetadataEntry(), typeManager); - List predicatedColumns = schema.stream() - .filter(column -> predicatedColumnNames.contains(column.getName())) // DeltaLakeColumnMetadata.name is lowercase + .distinct() + // DeltaLakeColumnMetadata.name is lowercase + .collect(toImmutableMap(column -> column.getQualifiedPhysicalName().toLowerCase(ENGLISH), Function.identity())); // TODO is DeltaLakeColumnHandle.name normalized? + + Set predicatedColumnNames = predicatedColumnHandles.keySet(); + Set projectedColumnNames = projectColumnNames(extractSchema(tableHandle.getMetadataEntry(), typeManager)); + + List predicatedColumns = projectedColumnNames.stream() + .filter(column -> predicatedColumnNames.contains(column)) + .map(column -> predicatedColumnHandles.get(column)) .collect(toImmutableList()); return validDataFiles.stream() @@ -205,8 +213,7 @@ private Stream getSplits( TupleDomain statisticsPredicate = createStatisticsPredicate( addAction, - predicatedColumns, - tableHandle.getMetadataEntry().getCanonicalPartitionColumns()); + predicatedColumns); if (!nonPartitionConstraint.overlaps(statisticsPredicate)) { return Stream.empty(); } @@ -236,6 +243,27 @@ private Stream getSplits( }); } + private Set projectColumnNames(List schema) + { + ImmutableSet.Builder projectedColumnNames = ImmutableSet.builder(); + for (DeltaLakeColumnMetadata column : schema) { + projectColumns(column.getPhysicalName(), column.getPhysicalColumnType(), projectedColumnNames); + } + return projectedColumnNames.build(); + } + + private void projectColumns(String name, Type type, ImmutableSet.Builder projectedColumnNames) + { + if (type instanceof RowType rowType) { + for (RowType.Field field : rowType.getFields()) { + projectColumns(name + "#" + field.getName().orElseThrow(), field.getType(), projectedColumnNames); + } + } + else { + projectedColumnNames.add(name); + } + } + private static boolean mayAnyDataColumnProjected(DeltaLakeTableHandle tableHandle) { if (tableHandle.getProjectedColumns().isEmpty()) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index 768269254471..d7566d40aaeb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -57,10 +57,11 @@ import java.util.function.Function; import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.convertNestedMapKeys; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.hasInvalidStatistics; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonEncodeMax; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonEncodeMin; @@ -200,11 +201,15 @@ public DataFileInfo getDataFileInfo() private static DeltaLakeJsonFileStatistics readStatistics(TrinoInputFile inputFile, List dataColumnNames, List dataColumnTypes, long rowCount) throws IOException { - ImmutableMap.Builder typeForColumn = ImmutableMap.builder(); + ImmutableMap.Builder typeForProjectedColumnBuilder = ImmutableMap.builder(); for (int i = 0; i < dataColumnNames.size(); i++) { - typeForColumn.put(dataColumnNames.get(i), dataColumnTypes.get(i)); + String columnName = dataColumnNames.get(i); + Type columnType = dataColumnTypes.get(i); + populateType(columnName, columnType, typeForProjectedColumnBuilder); } + Map typeForProjectedColumn = typeForProjectedColumnBuilder.buildOrThrow(); + try (TrinoParquetDataSource trinoParquetDataSource = new TrinoParquetDataSource( inputFile, new ParquetReaderOptions(), @@ -214,15 +219,15 @@ private static DeltaLakeJsonFileStatistics readStatistics(TrinoInputFile inputFi ImmutableMultimap.Builder metadataForColumn = ImmutableMultimap.builder(); for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) { for (ColumnChunkMetaData columnChunkMetaData : blockMetaData.getColumns()) { - if (columnChunkMetaData.getPath().size() != 1) { - continue; // Only base column stats are supported + checkArgument(columnChunkMetaData.getPath().size() > 0, "columnChunkMetaData path must not be empty"); + String columnName = columnChunkMetaData.getPath().toDotString(); + if (typeForProjectedColumn.containsKey(columnName)) { + metadataForColumn.put(columnName, columnChunkMetaData); } - String columnName = getOnlyElement(columnChunkMetaData.getPath()); - metadataForColumn.put(columnName, columnChunkMetaData); } } - return mergeStats(metadataForColumn.build(), typeForColumn.buildOrThrow(), rowCount); + return mergeStats(metadataForColumn.build(), typeForProjectedColumn, rowCount); } } @@ -232,15 +237,18 @@ static DeltaLakeJsonFileStatistics mergeStats(Multimap>> statsForColumn = metadataForColumn.keySet().stream() .collect(toImmutableMap(identity(), key -> mergeMetadataList(metadataForColumn.get(key)))); - Map nullCount = statsForColumn.entrySet().stream() + Map> nullCount = statsForColumn.entrySet().stream() .filter(entry -> entry.getValue().isPresent()) - .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get().getNumNulls())); + .collect(toImmutableMap(Map.Entry::getKey, entry -> Optional.of(entry.getValue().get().getNumNulls()))); + + // TODO Databricks collect nullCount stats for Map and Array type as well, whereas trino does not collect + Map convertedNullCount = convertNestedMapKeys(nullCount); return new DeltaLakeJsonFileStatistics( Optional.of(rowCount), Optional.of(jsonEncodeMin(statsForColumn, typeForColumn)), Optional.of(jsonEncodeMax(statsForColumn, typeForColumn)), - Optional.of(nullCount)); + Optional.of(convertedNullCount)); } private static Optional> mergeMetadataList(Collection metadataList) @@ -257,6 +265,20 @@ private static Optional> mergeMetadataList(Collection typeForProjectedColumn) + { + if (type instanceof RowType rowType) { + List fields = rowType.getFields(); + for (RowType.Field field : fields) { + String projectedName = name + "." + field.getName().orElseThrow(); + populateType(projectedName, field.getType(), typeForProjectedColumn); + } + } + else { + typeForProjectedColumn.put(name, type); + } + } + @Override public String toString() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java index 254e21b229ac..857e3596710e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java @@ -45,6 +45,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.toColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled; import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.spi.statistics.StatsUtil.toStatsRepresentation; @@ -105,8 +106,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab Set predicatedColumnNames = tableHandle.getNonPartitionConstraint().getDomains().orElseThrow().keySet().stream() .map(DeltaLakeColumnHandle::getBaseColumnName) .collect(toImmutableSet()); - List predicatedColumns = columnMetadata.stream() + List predicatedColumns = columnMetadata.stream() .filter(column -> predicatedColumnNames.contains(column.getName())) + .map(column -> toColumnHandle(column.getColumnMetadata(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), tableHandle.getMetadataEntry().getCanonicalPartitionColumns())) .collect(toImmutableList()); for (AddFileEntry addEntry : transactionLogAccess.getActiveFiles(tableSnapshot, session)) { @@ -122,8 +124,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab TupleDomain statisticsPredicate = createStatisticsPredicate( addEntry, - predicatedColumns, - tableHandle.getMetadataEntry().getCanonicalPartitionColumns()); + predicatedColumns); if (!tableHandle.getNonPartitionConstraint().overlaps(statisticsPredicate)) { continue; } @@ -147,7 +148,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab } } else { - Optional maybeNullCount = column.isBaseColumn() ? stats.getNullCount(column.getBasePhysicalColumnName()) : Optional.empty(); + Optional maybeNullCount = column.isBaseColumn() ? stats.getNullCount(column) : Optional.empty(); if (maybeNullCount.isPresent()) { nullCounts.put(column, nullCounts.get(column) + maybeNullCount.get()); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java index adbba2203695..516fdc4f76bf 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java @@ -127,7 +127,7 @@ public static Object jsonValueToTrinoValue(Type type, @Nullable Object jsonValue return (double) jsonValue; } if (type instanceof DecimalType decimalType) { - BigDecimal decimal = new BigDecimal((String) jsonValue); + BigDecimal decimal = new BigDecimal(String.valueOf(jsonValue)); if (decimalType.isShort()) { return Decimals.encodeShortScaledValue(decimal, decimalType.getScale()); @@ -148,7 +148,7 @@ public static Object jsonValueToTrinoValue(Type type, @Nullable Object jsonValue List fieldTypes = rowType.getTypeParameters(); BlockBuilder blockBuilder = new RowBlockBuilder(fieldTypes, null, 1); BlockBuilder singleRowBlockWriter = blockBuilder.beginBlockEntry(); - for (int i = 0; i < values.size(); ++i) { + for (int i = 0; i < fieldTypes.size(); ++i) { Type fieldType = fieldTypes.get(i); String fieldName = rowType.getFields().get(i).getName().orElseThrow(() -> new IllegalArgumentException("Field name must exist")); Object fieldValue = jsonValueToTrinoValue(fieldType, values.remove(fieldName)); @@ -226,25 +226,40 @@ private static Object toJsonValue(Type type, @Nullable Object value) throw new UnsupportedOperationException("Unsupported type: " + type); } - public static Map jsonEncodeMin(Map>> stats, Map typeForColumn) + public static Map jsonEncodeMin(Map>> stats, Map typeForProjectedColumn) { - return jsonEncode(stats, typeForColumn, DeltaLakeParquetStatisticsUtils::getMin); + return jsonEncode(stats, typeForProjectedColumn, DeltaLakeParquetStatisticsUtils::getMin); } - public static Map jsonEncodeMax(Map>> stats, Map typeForColumn) + public static Map jsonEncodeMax(Map>> stats, Map typeForProjectedColumn) { - return jsonEncode(stats, typeForColumn, DeltaLakeParquetStatisticsUtils::getMax); + return jsonEncode(stats, typeForProjectedColumn, DeltaLakeParquetStatisticsUtils::getMax); } - private static Map jsonEncode(Map>> stats, Map typeForColumn, BiFunction, Optional> accessor) + private static Map jsonEncode(Map>> stats, Map typeForProjectedColumn, BiFunction, Optional> accessor) { Map> allStats = stats.entrySet().stream() .filter(entry -> entry.getValue() != null && entry.getValue().isPresent() && !entry.getValue().get().isEmpty()) - .collect(toImmutableMap(Map.Entry::getKey, entry -> accessor.apply(typeForColumn.get(entry.getKey()), entry.getValue().get()))); + .collect(toImmutableMap(Map.Entry::getKey, entry -> accessor.apply(typeForProjectedColumn.get(entry.getKey()), entry.getValue().get()))); - return allStats.entrySet().stream() - .filter(entry -> entry.getValue() != null && entry.getValue().isPresent()) - .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get())); + return convertNestedMapKeys(allStats); + } + + public static Map convertNestedMapKeys(Map> map) + { + Map convertedMap = new HashMap<>(); + for (Map.Entry> entry : map.entrySet()) { + String[] keys = entry.getKey().split("\\."); + Map nestedMap = convertedMap; + for (int i = 0; i < keys.length - 1; i++) { + nestedMap = (Map) nestedMap.computeIfAbsent(keys[i], k -> new HashMap<>()); + } + Optional value = entry.getValue(); + if (value != null && value.isPresent()) { + nestedMap.put(keys[keys.length - 1], value.get()); + } + } + return ImmutableMap.copyOf(convertedMap); } public static Map toNullCounts(Map columnTypeMapping, Map values) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java index 5f0e6eda0532..bc8d787258d8 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java @@ -26,6 +26,7 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.base.util.JsonUtils; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import io.trino.plugin.deltalake.DeltaLakeColumnProjectionInfo; import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; import io.trino.spi.TrinoException; import io.trino.spi.type.DecimalType; @@ -53,7 +54,6 @@ import java.util.Optional; import java.util.function.Function; -import static com.google.common.base.Verify.verify; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; @@ -164,8 +164,7 @@ private static Long readPartitionTimestamp(String timestamp) public static Object deserializeColumnValue(DeltaLakeColumnHandle column, String valueString, Function timestampReader) { - verify(column.isBaseColumn(), "Unexpected dereference: %s", column); - Type type = column.getBaseType(); + Type type = column.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getType).orElse(column.getBaseType()); try { if (type.equals(BOOLEAN)) { if (valueString.equalsIgnoreCase("true")) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index 5d11a9ba4e99..b780a7410696 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -284,7 +284,7 @@ private void writeParsedStats(BlockBuilder entryBlockBuilder, RowType entryType, writeLong(statsBlockBuilder, statsType, 0, "numRecords", stats.getNumRecords().orElse(null)); writeMinMaxMapAsFields(statsBlockBuilder, statsType, 1, "minValues", stats.getMinValues(), false); writeMinMaxMapAsFields(statsBlockBuilder, statsType, 2, "maxValues", stats.getMaxValues(), false); - writeNullCountAsFields(statsBlockBuilder, statsType, 3, "nullCount", stats.getNullCount()); + writeNullCountAsFields(statsBlockBuilder, statsType, 3, "nullCount", stats.getNullCount(), false); } else { int internalFieldId = 0; @@ -295,7 +295,7 @@ private void writeParsedStats(BlockBuilder entryBlockBuilder, RowType entryType, if (statsType.getFields().stream().anyMatch(field -> field.getName().orElseThrow().equals("maxValues"))) { writeMinMaxMapAsFields(statsBlockBuilder, statsType, internalFieldId++, "maxValues", stats.getMaxValues(), true); } - writeNullCountAsFields(statsBlockBuilder, statsType, internalFieldId++, "nullCount", stats.getNullCount()); + writeNullCountAsFields(statsBlockBuilder, statsType, internalFieldId++, "nullCount", stats.getNullCount(), true); } entryBlockBuilder.closeEntry(); } @@ -307,9 +307,11 @@ private void writeMinMaxMapAsFields(BlockBuilder blockBuilder, RowType type, int writeObjectMapAsFields(blockBuilder, type, fieldId, fieldName, preprocessMinMaxValues(valuesFieldType, values, isJson)); } - private void writeNullCountAsFields(BlockBuilder blockBuilder, RowType type, int fieldId, String fieldName, Optional> values) + private void writeNullCountAsFields(BlockBuilder blockBuilder, RowType type, int fieldId, String fieldName, Optional> values, boolean isJson) { - writeObjectMapAsFields(blockBuilder, type, fieldId, fieldName, preprocessNullCount(values)); + RowType.Field valuesField = validateAndGetField(type, fieldId, fieldName); + RowType valuesFieldType = (RowType) valuesField.getType(); + writeObjectMapAsFields(blockBuilder, type, fieldId, fieldName, preprocessNullCount(valuesFieldType, values, isJson)); } private void writeObjectMapAsFields(BlockBuilder blockBuilder, RowType type, int fieldId, String fieldName, Optional> values) @@ -375,20 +377,28 @@ private Optional> preprocessMinMaxValues(RowType valuesType, }); } - private Optional> preprocessNullCount(Optional> valuesOptional) + private Optional> preprocessNullCount(RowType valuesType, Optional> valuesOptional, boolean isJson) { + Map fieldTypes = valuesType.getFields().stream() + .collect(toMap( + field -> field.getName().orElseThrow(), // anonymous row fields are not expected here + RowType.Field::getType)); return valuesOptional.map( values -> values.entrySet().stream() - .collect(toMap( - Map.Entry::getKey, - entry -> { - Object value = entry.getValue(); - if (value instanceof Integer) { - return (long) (int) value; - } - return value; - }))); + .collect(toMap( + Map.Entry::getKey, + entry -> { + Object value = entry.getValue(); + if (value instanceof Integer) { + return (long) (int) value; + } + if (isJson) { + Type type = fieldTypes.get(entry.getKey().toLowerCase(ENGLISH)); + return jsonValueToTrinoValue(type, value); + } + return value; + }))); } private void writeRemoveFileEntry(PageBuilder pageBuilder, RowType entryType, RemoveFileEntry removeFileEntry) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeFileStatistics.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeFileStatistics.java index 9c7a2ef3c540..d3a603ab4383 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeFileStatistics.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeFileStatistics.java @@ -33,7 +33,7 @@ public interface DeltaLakeFileStatistics Optional> getNullCount(); - Optional getNullCount(String columnName); + Optional getNullCount(DeltaLakeColumnHandle columnHandle); Optional getMinColumnValue(DeltaLakeColumnHandle columnHandle); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeJsonFileStatistics.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeJsonFileStatistics.java index 61d6954822e8..47dc258fad19 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeJsonFileStatistics.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeJsonFileStatistics.java @@ -17,10 +17,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import io.airlift.json.ObjectMapperProvider; import io.airlift.log.Logger; import io.airlift.slice.SizeOf; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import io.trino.plugin.deltalake.DeltaLakeColumnProjectionInfo; import io.trino.plugin.deltalake.transactionlog.CanonicalColumnName; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.spi.type.TimestampWithTimeZoneType; @@ -112,28 +114,19 @@ public Optional> getNullCount() @Override public Optional getMaxColumnValue(DeltaLakeColumnHandle columnHandle) { - if (!columnHandle.isBaseColumn()) { - return Optional.empty(); - } - Optional value = getStat(columnHandle.getBasePhysicalColumnName(), maxValues); + Optional value = getStat(columnHandle, maxValues); return value.flatMap(o -> deserializeStatisticsValue(columnHandle, String.valueOf(o))); } @Override public Optional getMinColumnValue(DeltaLakeColumnHandle columnHandle) { - if (!columnHandle.isBaseColumn()) { - return Optional.empty(); - } - Optional value = getStat(columnHandle.getBasePhysicalColumnName(), minValues); + Optional value = getStat(columnHandle, minValues); return value.flatMap(o -> deserializeStatisticsValue(columnHandle, String.valueOf(o))); } private Optional deserializeStatisticsValue(DeltaLakeColumnHandle columnHandle, String statValue) { - if (!columnHandle.isBaseColumn()) { - return Optional.empty(); - } Object columnValue = deserializeColumnValue(columnHandle, statValue, DeltaLakeJsonFileStatistics::readStatisticsTimestamp); Type columnType = columnHandle.getBaseType(); @@ -161,23 +154,30 @@ private static Long readStatisticsTimestamp(String timestamp) } @Override - public Optional getNullCount(String columnName) + public Optional getNullCount(DeltaLakeColumnHandle columnHandle) { - return getStat(columnName, nullCount).map(o -> Long.valueOf(o.toString())); + return getStat(columnHandle, nullCount).map(o -> Long.valueOf(o.toString())); } - private Optional getStat(String columnName, Optional> stats) + private Optional getStat(DeltaLakeColumnHandle columnHandle, Optional> stats) { if (stats.isEmpty()) { return Optional.empty(); } - CanonicalColumnName canonicalColumnName = new CanonicalColumnName(columnName); + CanonicalColumnName canonicalColumnName = new CanonicalColumnName(columnHandle.getBasePhysicalColumnName()); + List dereferenceNames = columnHandle.getProjectionInfo().map(DeltaLakeColumnProjectionInfo::getDereferencePhysicalNames) + .orElse(ImmutableList.of()); Object contents = stats.get().get(canonicalColumnName); + for (String dereferenceName : dereferenceNames) { + if (contents instanceof Map map) { + contents = map.get(dereferenceName); + } + } if (contents == null) { return Optional.empty(); } if (contents instanceof List || contents instanceof Map) { - log.debug("Skipping statistics value for column with complex value type: %s", columnName); + log.debug("Skipping statistics value for column with complex value type: %s", columnHandle); return Optional.empty(); } return Optional.of(contents); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeParquetFileStatistics.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeParquetFileStatistics.java index 4b5f098bf36e..55b75246582a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeParquetFileStatistics.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/statistics/DeltaLakeParquetFileStatistics.java @@ -16,9 +16,12 @@ import io.airlift.log.Logger; import io.airlift.slice.SizeOf; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import io.trino.plugin.deltalake.DeltaLakeColumnProjectionInfo; import io.trino.plugin.deltalake.transactionlog.CanonicalColumnName; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.spi.block.Block; +import io.trino.spi.block.ColumnarRow; +import io.trino.spi.type.TypeUtils; import java.util.List; import java.util.Map; @@ -29,6 +32,7 @@ import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.toCanonicalNameKeyedMap; +import static io.trino.spi.block.ColumnarRow.toColumnarRow; public class DeltaLakeParquetFileStatistics implements DeltaLakeFileStatistics @@ -82,39 +86,54 @@ public Optional> getNullCount() @Override public Optional getMaxColumnValue(DeltaLakeColumnHandle columnHandle) { - if (!columnHandle.isBaseColumn()) { - return Optional.empty(); - } - return getStat(columnHandle.getBasePhysicalColumnName(), maxValues); + return getStat(columnHandle, maxValues); } @Override public Optional getMinColumnValue(DeltaLakeColumnHandle columnHandle) { - if (!columnHandle.isBaseColumn()) { - return Optional.empty(); - } - return getStat(columnHandle.getBasePhysicalColumnName(), minValues); + return getStat(columnHandle, minValues); } @Override - public Optional getNullCount(String columnName) + public Optional getNullCount(DeltaLakeColumnHandle columnHandle) { - return getStat(columnName, nullCount).map(o -> Long.valueOf(o.toString())); + return getStat(columnHandle, nullCount).map(o -> Long.valueOf(o.toString())); } - private Optional getStat(String columnName, Optional> stats) + private Optional getStat(DeltaLakeColumnHandle columnHandle, Optional> stats) { if (stats.isEmpty()) { return Optional.empty(); } - CanonicalColumnName canonicalColumnName = new CanonicalColumnName(columnName); + CanonicalColumnName canonicalColumnName = new CanonicalColumnName(columnHandle.getBasePhysicalColumnName()); + Object contents = stats.get().get(canonicalColumnName); + if (!columnHandle.isBaseColumn()) { + DeltaLakeColumnProjectionInfo projectionInfo = columnHandle.getProjectionInfo().get(); + if (contents instanceof Block block) { + List dereferenceIndices = projectionInfo.getDereferenceIndices(); + for (int index : dereferenceIndices) { + ColumnarRow columnarRow = toColumnarRow(block); + block = columnarRow.getField(index); + } + contents = TypeUtils.readNativeValue(projectionInfo.getType(), block, 0); + } + else { + List dereferenceNames = projectionInfo.getDereferencePhysicalNames(); + for (String dereferenceName : dereferenceNames) { + if (contents instanceof Map map) { + contents = map.get(dereferenceName); + } + } + } + } + if (contents == null) { return Optional.empty(); } if (contents instanceof List || contents instanceof Map || contents instanceof Block) { - log.debug("Skipping statistics value for column with complex value type: %s", columnName); + log.debug("Skipping statistics value for column with complex value type: %s", columnHandle); return Optional.empty(); } return Optional.of(contents); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 52313b14e5a0..2dbb4f78a0ff 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -948,11 +948,16 @@ public void testConvertJsonStatisticsToParquetOnRowType() assertUpdate("INSERT INTO json_stats_on_row_type SELECT CAST(row(3) AS row(x bigint)), CAST(row(row('test insert')) AS row(y row(nested varchar)))", 1); assertThat(getTableFiles(transactionLogDirectory)) .contains(newTransactionFile, newCheckpointFile); - assertThat(getAddFileEntries("json_stats_on_row_type")).hasSize(3); - // The first two entries created by Databricks have column stats. The last one doesn't have column stats because the connector doesn't support collecting it on row columns. + assertUpdate("INSERT INTO json_stats_on_row_type VALUES (null, null)", 1); + assertThat(getTableFiles(transactionLogDirectory)) + .contains(newTransactionFile, newCheckpointFile); + + assertThat(getAddFileEntries("json_stats_on_row_type")).hasSize(4); + + // The first two entries created by Databricks have column stats. The last two entries created by trino have column stats List addFileEntries = getAddFileEntries("json_stats_on_row_type").stream().sorted(comparing(AddFileEntry::getModificationTime)).collect(toImmutableList()); - assertThat(addFileEntries).hasSize(3); + assertThat(addFileEntries).hasSize(4); assertJsonStatistics( addFileEntries.get(0), "{" + @@ -971,7 +976,20 @@ public void testConvertJsonStatisticsToParquetOnRowType() "}"); assertJsonStatistics( addFileEntries.get(2), - "{\"numRecords\":1,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}"); + "{" + + "\"numRecords\":1," + + "\"minValues\":{\"nested_struct_col\":{\"y\":{\"nested\":\"test insert\"}},\"struct_col\":{\"x\":3}}," + + "\"maxValues\":{\"nested_struct_col\":{\"y\":{\"nested\":\"test insert\"}},\"struct_col\":{\"x\":3}}," + + "\"nullCount\":{\"nested_struct_col\":{\"y\":{\"nested\":0}},\"struct_col\":{\"x\":0}}" + + "}"); + assertJsonStatistics( + addFileEntries.get(3), + "{" + + "\"numRecords\":1," + + "\"minValues\":{\"nested_struct_col\":{\"y\":{}},\"struct_col\":{}}," + + "\"maxValues\":{\"nested_struct_col\":{\"y\":{}},\"struct_col\":{}}," + + "\"nullCount\":{\"nested_struct_col\":{\"y\":{\"nested\":1}},\"struct_col\":{\"x\":1}}" + + "}"); } private List getAddFileEntries(String tableName) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeComplexTypePredicatePushDown.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeComplexTypePredicatePushDown.java new file mode 100644 index 000000000000..60b5ba708665 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeComplexTypePredicatePushDown.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import io.trino.testing.BaseTestFileFormatComplexTypesPredicatePushDown; +import io.trino.testing.QueryRunner; + +import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createDeltaLakeQueryRunner; + +public class TestDeltaLakeComplexTypePredicatePushDown + extends BaseTestFileFormatComplexTypesPredicatePushDown +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createDeltaLakeQueryRunner( + DELTA_CATALOG, + ImmutableMap.of(), + ImmutableMap.of("delta.enable-non-concurrent-writes", "true")); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableStatistics.java index 2fb3a30fc02e..8d457ac74738 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableStatistics.java @@ -19,9 +19,14 @@ import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.spi.type.ArrayType; import io.trino.spi.type.DateType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.DoubleType; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeOperators; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import org.testng.annotations.DataProvider; @@ -41,11 +46,14 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.Decimals.MAX_SHORT_PRECISION; import static io.trino.spi.type.Decimals.encodeScaledValue; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.Double.NEGATIVE_INFINITY; @@ -83,24 +91,93 @@ public void testComplexDataTypes() "test_complex_data_types_", ImmutableList.of("a", "b", "c", "d"), "VALUES (CAST(ROW(1, 2) AS ROW(x BIGINT, y BIGINT)), ARRAY[1, 2, 3], MAP(ARRAY[1, 2], ARRAY['a', 'b']), 'foo'), " + - "(CAST(ROW(3, 4) AS ROW(x BIGINT, y BIGINT)), ARRAY[4, 5], MAP(ARRAY[3], ARRAY['c']), 'moo')")) { + "(CAST(ROW(null, 4) AS ROW(x BIGINT, y BIGINT)), ARRAY[-1, -2, -3], MAP(ARRAY[4], ARRAY['d']), null), " + + "(CAST(ROW(3, null) AS ROW(x BIGINT, y BIGINT)), ARRAY[4, 5], MAP(ARRAY[3], ARRAY['c']), 'moo')")) { List addFileEntries = getAddFileEntries(table.getName()); AddFileEntry entry = getOnlyElement(addFileEntries); assertThat(entry.getStats()).isPresent(); DeltaLakeFileStatistics fileStatistics = entry.getStats().get(); DeltaLakeColumnHandle columnHandle = new DeltaLakeColumnHandle("d", createUnboundedVarcharType(), OptionalInt.empty(), "d", createUnboundedVarcharType(), REGULAR, Optional.empty()); - assertEquals(fileStatistics.getNumRecords(), Optional.of(2L)); + assertEquals(fileStatistics.getNumRecords(), Optional.of(3L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(utf8Slice("foo"))); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(utf8Slice("moo"))); - assertEquals(fileStatistics.getNullCount("d"), Optional.of(0L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(1L)); for (String complexColumn : ImmutableList.of("a", "b", "c")) { columnHandle = new DeltaLakeColumnHandle(complexColumn, createUnboundedVarcharType(), OptionalInt.empty(), complexColumn, createUnboundedVarcharType(), REGULAR, Optional.empty()); assertThat(fileStatistics.getMaxColumnValue(columnHandle)).isEmpty(); assertThat(fileStatistics.getMinColumnValue(columnHandle)).isEmpty(); - assertThat(fileStatistics.getNullCount(complexColumn)).isEmpty(); + assertThat(fileStatistics.getNullCount(columnHandle)).isEmpty(); } + + // dereference column statistics + Type rowBaseType = RowType.rowType(RowType.field("x", BIGINT), RowType.field("x", BIGINT)); + + columnHandle = new DeltaLakeColumnHandle("a", rowBaseType, OptionalInt.empty(), "a", rowBaseType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(BIGINT, ImmutableList.of(0), ImmutableList.of("x")))); + assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(1L)); + assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(3L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(1L)); + + columnHandle = new DeltaLakeColumnHandle("a", rowBaseType, OptionalInt.empty(), "a", rowBaseType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(BIGINT, ImmutableList.of(1), ImmutableList.of("y")))); + assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(2L)); + assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(4L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(1L)); + } + } + + @Test + public void testPrimitiveTypeInsideRowColumn() + throws Exception + { + try (TestTable table = new TestTable( + "test_primitive_type_inside_row_column_", + ImmutableList.of("x"), + "VALUES " + + "ROW(CAST(ROW(1, 'stringValue', ARRAY[1, 2, 3], true, MAP(ARRAY[1], ARRAY['mapValue1'])) AS ROW(a BIGINT, b VARCHAR, c ARRAY(BIGINT), d BOOLEAN, e MAP(BIGINT, VARCHAR)))), " + + "ROW(CAST(ROW(2, null, ARRAY[4, null, 6], true, MAP(ARRAY[2], ARRAY[null])) AS ROW(a BIGINT, b VARCHAR, c ARRAY(BIGINT), d BOOLEAN, e MAP(BIGINT, VARCHAR)))), " + + "ROW(CAST(null AS ROW(a BIGINT, b VARCHAR, c ARRAY(BIGINT), d BOOLEAN, e MAP(BIGINT, VARCHAR))))")) { + List addFileEntries = getAddFileEntries(table.getName()); + AddFileEntry entry = getOnlyElement(addFileEntries); + assertThat(entry.getStats()).isPresent(); + DeltaLakeFileStatistics fileStatistics = entry.getStats().get(); + + Type rowBaseType = RowType.rowType( + RowType.field("a", BIGINT), + RowType.field("b", createUnboundedVarcharType()), + RowType.field("c", new ArrayType(BIGINT)), + RowType.field("d", BOOLEAN), + RowType.field("e", new MapType(BIGINT, VARCHAR, new TypeOperators()))); + + // x.a + DeltaLakeColumnHandle columnHandle = new DeltaLakeColumnHandle("x", rowBaseType, OptionalInt.empty(), "x", rowBaseType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(BIGINT, ImmutableList.of(0), ImmutableList.of("a")))); + assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(1L)); + assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(2L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(1L)); + + // x.b + columnHandle = new DeltaLakeColumnHandle("x", rowBaseType, OptionalInt.empty(), "x", rowBaseType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(createUnboundedVarcharType(), ImmutableList.of(1), ImmutableList.of("b")))); + assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(utf8Slice("stringValue"))); + assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(utf8Slice("stringValue"))); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(2L)); + + // x.c + columnHandle = new DeltaLakeColumnHandle("x", rowBaseType, OptionalInt.empty(), "x", rowBaseType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(new ArrayType(BIGINT), ImmutableList.of(2), ImmutableList.of("c")))); + assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); + assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.empty()); + + // x.d + columnHandle = new DeltaLakeColumnHandle("x", rowBaseType, OptionalInt.empty(), "x", rowBaseType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(BOOLEAN, ImmutableList.of(3), ImmutableList.of("d")))); + assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); + assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(1L)); + + // x.e + columnHandle = new DeltaLakeColumnHandle("x", rowBaseType, OptionalInt.empty(), "x", rowBaseType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(new MapType(BIGINT, VARCHAR, new TypeOperators()), ImmutableList.of(4), ImmutableList.of("e")))); + assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); + assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.empty()); } } @@ -125,7 +202,7 @@ public void testDoubleTypesNaN(String type) assertEquals(fileStatistics.getNumRecords(), Optional.of(2L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); - assertEquals(fileStatistics.getNullCount(columnName), Optional.empty()); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.empty()); } } @@ -147,7 +224,7 @@ public void testDoubleTypesInf(String type) assertEquals(fileStatistics.getNumRecords(), Optional.of(3L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(NEGATIVE_INFINITY)); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(POSITIVE_INFINITY)); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(0L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(0L)); } } @@ -169,7 +246,7 @@ public void testDoubleTypesInfAndNaN(String type) assertEquals(fileStatistics.getNumRecords(), Optional.of(4L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); - assertEquals(fileStatistics.getNullCount(columnName), Optional.empty()); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.empty()); } } @@ -191,7 +268,7 @@ public void testDoubleTypesNaNPositive(String type) assertEquals(fileStatistics.getNumRecords(), Optional.of(4L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); - assertEquals(fileStatistics.getNullCount(columnName), Optional.empty()); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.empty()); } } @@ -213,7 +290,7 @@ public void testDoubleTypesNaNNegative(String type) assertEquals(fileStatistics.getNumRecords(), Optional.of(4L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); - assertEquals(fileStatistics.getNullCount(columnName), Optional.empty()); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.empty()); } } @@ -269,7 +346,7 @@ private void testDecimal(int precision, int scale) } assertEquals(fileStatistics.getMinColumnValue(columnHandle), expectedMin); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), expectedMax); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(0L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(0L)); } } @@ -288,7 +365,7 @@ public void testNullRecords() assertEquals(fileStatistics.getNumRecords(), Optional.of(4L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(0.0)); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(1.0)); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(2L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(2L)); } } @@ -310,7 +387,7 @@ public void testOnlyNullRecords() assertEquals(fileStatistics.getNumRecords(), Optional.of(4L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.empty()); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.empty()); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(4L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(4L)); } } @@ -332,7 +409,7 @@ public void testDateRecords() assertEquals(fileStatistics.getNumRecords(), Optional.of(4L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(LocalDate.parse("2011-08-08").toEpochDay())); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(LocalDate.parse("2013-08-09").toEpochDay())); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(0L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(0L)); } } @@ -358,7 +435,7 @@ public void testTimestampMilliRecords() assertEquals( fileStatistics.getMaxColumnValue(columnHandle), Optional.of(packDateTimeWithZone(ZonedDateTime.parse("2012-10-31T08:00:00.123Z").toInstant().toEpochMilli(), UTC_KEY))); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(0L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(0L)); } } @@ -377,7 +454,7 @@ public void testUnicodeValues() assertEquals(fileStatistics.getNumRecords(), Optional.of(2L)); assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(utf8Slice("ab\uFAD8"))); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(utf8Slice("ab\uD83D\uDD74"))); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(0L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(0L)); } } @@ -404,13 +481,13 @@ public void testPartitionedTable() assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(utf8Slice("a"))); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(utf8Slice("c"))); assertEquals(fileStatistics.getNumRecords(), Optional.of(4L)); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(1L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(1L)); } else if (addFileEntry.getPartitionValues().get(partitionColumn).equals("2")) { assertEquals(fileStatistics.getMinColumnValue(columnHandle), Optional.of(utf8Slice("c"))); assertEquals(fileStatistics.getMaxColumnValue(columnHandle), Optional.of(utf8Slice("e"))); assertEquals(fileStatistics.getNumRecords(), Optional.of(3L)); - assertEquals(fileStatistics.getNullCount(columnName), Optional.of(0L)); + assertEquals(fileStatistics.getNullCount(columnHandle), Optional.of(0L)); } } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java index 91d1fde1dafd..6416e3828384 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableStatistics.java @@ -41,6 +41,9 @@ public void registerTables() String dataPath = Resources.getResource("databricks/person").toExternalForm(); getQueryRunner().execute( format("CALL system.register_table('%s', 'person', '%s')", getSession().getSchema().orElseThrow(), dataPath)); + dataPath = Resources.getResource("databricks/pruning/nested_fields").toExternalForm(); + getQueryRunner().execute( + format("CALL system.register_table('%s', 'nested_fields', '%s')", getSession().getSchema().orElseThrow(), dataPath)); } @Test @@ -176,4 +179,17 @@ public void testShowStatsForAllNullColumn() "('col', 0.0, 0.0, 1.0, null, null, null)," + "(null, null, null, null, 1.0, null, null)"); } + + @Test + public void testShowStatsNestedFields() + { + assertQuery( + "SHOW STATS FOR nested_fields", + "VALUES " + + // column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value + "('id', null, null, 0.0, null, 1, 10)," + + "('parent', null, null, null, null, null, null)," + + "('grandparent', null, null, null, null, null, null)," + + "(null, null, null, null, 10.0, null, null)"); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeWriter.java index c2a075507932..cf7ef37d431f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeWriter.java @@ -64,7 +64,7 @@ public void testMergeIntStatistics() assertEquals(fileStats.getNumRecords(), Optional.of(20L)); assertEquals(fileStats.getMinColumnValue(intColumn), Optional.of(-200L)); assertEquals(fileStats.getMaxColumnValue(intColumn), Optional.of(250L)); - assertEquals(fileStats.getNullCount(columnName), Optional.of(13L)); + assertEquals(fileStats.getNullCount(intColumn), Optional.of(13L)); } @Test @@ -83,7 +83,7 @@ public void testMergeFloatStatistics() assertEquals(fileStats.getNumRecords(), Optional.of(20L)); assertEquals(fileStats.getMinColumnValue(floatColumn), Optional.of((long) floatToRawIntBits(-2.001f))); assertEquals(fileStats.getMaxColumnValue(floatColumn), Optional.of((long) floatToRawIntBits(1.0f))); - assertEquals(fileStats.getNullCount(columnName), Optional.of(13L)); + assertEquals(fileStats.getNullCount(floatColumn), Optional.of(13L)); } @Test @@ -104,7 +104,7 @@ public void testMergeFloatNaNStatistics() assertEquals(fileStats.getNumRecords(), Optional.of(20L)); assertEquals(fileStats.getMinColumnValue(floatColumn), Optional.empty()); assertEquals(fileStats.getMaxColumnValue(floatColumn), Optional.empty()); - assertEquals(fileStats.getNullCount(columnName), Optional.empty()); + assertEquals(fileStats.getNullCount(floatColumn), Optional.empty()); } @Test @@ -125,7 +125,7 @@ public void testMergeDoubleNaNStatistics() assertEquals(fileStats.getNumRecords(), Optional.of(20L)); assertEquals(fileStats.getMinColumnValue(doubleColumn), Optional.empty()); assertEquals(fileStats.getMaxColumnValue(doubleColumn), Optional.empty()); - assertEquals(fileStats.getNullCount(columnName), Optional.empty()); + assertEquals(fileStats.getNullCount(doubleColumn), Optional.empty()); } @Test @@ -144,7 +144,7 @@ public void testMergeStringStatistics() assertEquals(fileStats.getNumRecords(), Optional.of(20L)); assertEquals(fileStats.getMinColumnValue(varcharColumn), Optional.of(utf8Slice("aba"))); assertEquals(fileStats.getMaxColumnValue(varcharColumn), Optional.of(utf8Slice("ab⌘"))); - assertEquals(fileStats.getNullCount(columnName), Optional.of(12L)); + assertEquals(fileStats.getNullCount(varcharColumn), Optional.of(12L)); } @Test @@ -163,7 +163,7 @@ public void testMergeStringUnicodeStatistics() assertEquals(fileStats.getNumRecords(), Optional.of(20L)); assertEquals(fileStats.getMinColumnValue(varcharColumn), Optional.of(utf8Slice("aba"))); assertEquals(fileStats.getMaxColumnValue(varcharColumn), Optional.of(utf8Slice("ab\uD83D\uDD74"))); - assertEquals(fileStats.getNullCount(columnName), Optional.of(12L)); + assertEquals(fileStats.getNullCount(varcharColumn), Optional.of(12L)); } private ColumnChunkMetaData createMetaData(String columnName, PrimitiveType columnType, long valueCount, Statistics statistics) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java index 6a58359c9fca..60d783151f95 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java @@ -372,17 +372,40 @@ public void testPrimitiveFieldsInsideRowColumnPruning() Set.of(), 0); - // TODO pruning does not work on primitive fields inside a struct, expected splits should be 1 after file pruning (https://github.com/trinodb/trino/issues/17164) + assertResultAndSplitCount( + "SELECT grandparent.parent1.child1 FROM nested_fields WHERE parent.child1 > 400", + Set.of(50.99, 60.99, 70.99, 80.99, 90.99, 100.99), + 2); + assertResultAndSplitCount( "SELECT grandparent.parent1.child1 FROM nested_fields WHERE parent.child1 > 600", Set.of(70.99, 80.99, 90.99, 100.99), - 2); + 1); + + assertResultAndSplitCount( + "SELECT grandparent.parent1.child1 FROM nested_fields WHERE parent.child1 > 600 AND parent.child1 < 1000", + Set.of(70.99, 80.99, 90.99), + 1); + + assertResultAndSplitCount( + "SELECT grandparent.parent1.child1 FROM nested_fields WHERE parent.child1 > 600 AND grandparent.parent1.child1 < 100", + Set.of(70.99, 80.99, 90.99), + 1); - // TODO pruning does not work on primitive fields inside a struct, expected splits should be 0 after file pruning (https://github.com/trinodb/trino/issues/17164) assertResultAndSplitCount( "SELECT grandparent.parent1.child1 FROM nested_fields WHERE parent.child1 > 1000", Set.of(), - 2); + 0); + + assertResultAndSplitCount( + "SELECT grandparent.parent1.child1 FROM nested_fields WHERE parent.child1 > 1000 AND parent.child2 = 'INDIA'", + Set.of(), + 0); + + assertResultAndSplitCount( + "SELECT grandparent.parent1.child1 FROM nested_fields WHERE parent.child1 > 1000 AND grandparent.parent1.child1 < 20", + Set.of(), + 0); } @Test diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 5b53d6acb097..bdc22ec689c7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -624,7 +624,7 @@ public void testSnapshotsAreConsistent() DeltaLakeColumnHandle columnHandle = new DeltaLakeColumnHandle(column.getName(), column.getType(), OptionalInt.empty(), column.getName(), column.getType(), REGULAR, Optional.empty()); assertEquals(expected.getStats().get().getMinColumnValue(columnHandle), actual.getStats().get().getMinColumnValue(columnHandle)); assertEquals(expected.getStats().get().getMaxColumnValue(columnHandle), actual.getStats().get().getMaxColumnValue(columnHandle)); - assertEquals(expected.getStats().get().getNullCount(columnHandle.getBaseColumnName()), actual.getStats().get().getNullCount(columnHandle.getBaseColumnName())); + assertEquals(expected.getStats().get().getNullCount(columnHandle), actual.getStats().get().getNullCount(columnHandle)); assertEquals(expected.getStats().get().getNumRecords(), actual.getStats().get().getNumRecords()); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java index 4b5fb2feec79..033f30688755 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java @@ -17,7 +17,6 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.spi.type.DoubleType; -import io.trino.spi.type.IntegerType; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -26,9 +25,13 @@ import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; +import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; @@ -54,10 +57,10 @@ public void testIntegerStatistics() .build(); assertEquals( - DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, IntegerType.INTEGER)), + DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, INTEGER)), ImmutableMap.of(columnName, -100)); assertEquals( - DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, IntegerType.INTEGER)), + DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, INTEGER)), ImmutableMap.of(columnName, 150)); } @@ -171,6 +174,120 @@ public void testTimestampStatisticsMillisPrecision() ImmutableMap.of(columnName, "2020-08-26T01:02:03.123Z")); } + @Test + public void testNestedFieldStatistics() + { + String stringColumn = "t_grandparent.t_parent.t_string"; + PrimitiveType stringType = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, stringColumn); + Statistics stringColumnStats = Statistics.getBuilderForReading(stringType) + .withMin("abc".getBytes(UTF_8)) + .withMax("bac".getBytes(UTF_8)) + .withNumNulls(1) + .build(); + + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(stringColumn, Optional.of(stringColumnStats)), ImmutableMap.of(stringColumn, createUnboundedVarcharType())), + ImmutableMap.of("t_grandparent", ImmutableMap.of("t_parent", ImmutableMap.of("t_string", "abc")))); + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(stringColumn, Optional.of(stringColumnStats)), ImmutableMap.of(stringColumn, createUnboundedVarcharType())), + ImmutableMap.of("t_grandparent", ImmutableMap.of("t_parent", ImmutableMap.of("t_string", "bac")))); + + String booleanColumn = "t_grandparent.t_parent.t_boolean"; + PrimitiveType booleanType = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BOOLEAN, booleanColumn); + Statistics booleanColumnStats = Statistics.getBuilderForReading(booleanType) + .withMin(getBooleanByteArray(false)) + .withMax(getBooleanByteArray(true)) + .withNumNulls(1) + .build(); + + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(booleanColumn, Optional.of(booleanColumnStats)), ImmutableMap.of(booleanColumn, BOOLEAN)), + ImmutableMap.of("t_grandparent", ImmutableMap.of("t_parent", ImmutableMap.of()))); + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(booleanColumn, Optional.of(booleanColumnStats)), ImmutableMap.of(booleanColumn, BOOLEAN)), + ImmutableMap.of("t_grandparent", ImmutableMap.of("t_parent", ImmutableMap.of()))); + + String nullColumn = "t_parent.t_null"; + PrimitiveType nullColumnType = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, nullColumn); + Statistics nullColumnStats = Statistics.getBuilderForReading(nullColumnType) + .withMin(null) + .withMax(null) + .withNumNulls(1) + .build(); + + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(nullColumn, Optional.of(nullColumnStats)), ImmutableMap.of(nullColumn, INTEGER)), + ImmutableMap.of("t_parent", ImmutableMap.of())); + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(nullColumn, Optional.of(nullColumnStats)), ImmutableMap.of(nullColumn, INTEGER)), + ImmutableMap.of("t_parent", ImmutableMap.of())); + } + + @Test + public void testMultipleNestedFieldStatistics() + { + String stringColumn = "t_grandparent.t_parent.t_string"; + PrimitiveType stringType = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, stringColumn); + Statistics stringColumnStats = Statistics.getBuilderForReading(stringType) + .withMin("abc".getBytes(UTF_8)) + .withMax("bac".getBytes(UTF_8)) + .withNumNulls(1) + .build(); + + String booleanColumn = "t_grandparent.t_parent.t_boolean"; + PrimitiveType booleanType = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BOOLEAN, booleanColumn); + Statistics booleanColumnStats = Statistics.getBuilderForReading(booleanType) + .withMin(getBooleanByteArray(false)) + .withMax(getBooleanByteArray(true)) + .withNumNulls(1) + .build(); + + String timestampColumn = "t_grandparent.t_timestamp"; + PrimitiveType timestampType = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, timestampColumn); + Statistics timestampColumnStats = Statistics.getBuilderForReading(timestampType) + .withMin(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123456789"))) + .withMax(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123987654"))) + .withNumNulls(2) + .build(); + + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMin( + ImmutableMap.of(stringColumn, Optional.of(stringColumnStats), booleanColumn, Optional.of(booleanColumnStats), timestampColumn, Optional.of(timestampColumnStats)), + ImmutableMap.of(stringColumn, createUnboundedVarcharType(), booleanColumn, BOOLEAN, timestampColumn, TIMESTAMP_TZ_MILLIS)), + ImmutableMap.of("t_grandparent", ImmutableMap.of("t_parent", ImmutableMap.of("t_string", "abc"), "t_timestamp", "2020-08-26T01:02:03.123Z"))); + assertEquals( + DeltaLakeParquetStatisticsUtils.jsonEncodeMax( + ImmutableMap.of(stringColumn, Optional.of(stringColumnStats), booleanColumn, Optional.of(booleanColumnStats), timestampColumn, Optional.of(timestampColumnStats)), + ImmutableMap.of(stringColumn, createUnboundedVarcharType(), booleanColumn, BOOLEAN, timestampColumn, TIMESTAMP_TZ_MILLIS)), + ImmutableMap.of("t_grandparent", ImmutableMap.of("t_parent", ImmutableMap.of("t_string", "bac"), "t_timestamp", "2020-08-26T01:02:03.124Z"))); + } + + @Test + public void testPopulateNestedStats() + { + Map> allStats = new HashMap<>(); + allStats.put("base1", Optional.of(2)); + allStats.put("base2", Optional.of("base2Value")); + allStats.put("base3", Optional.of(100L)); + allStats.put("base4", Optional.empty()); // should get discard + allStats.put("base5", null); // should get discard + allStats.put("base6.f1", Optional.empty()); // should get discard + allStats.put("base6.f2", Optional.of(99.99)); + allStats.put("base6.f3", Optional.of("2020-08-26T01:02:03.123Z")); + allStats.put("base7.level1.f4", null); // should get discard + allStats.put("base7.level1.f5", Optional.empty()); // should get discard + allStats.put("base7.level1.f6", Optional.of("base5Level1F6Value")); + + assertEquals( + DeltaLakeParquetStatisticsUtils.convertNestedMapKeys(allStats), + ImmutableMap.of( + "base1", 2, + "base2", "base2Value", + "base3", 100L, + "base6", ImmutableMap.of("f2", 99.99, "f3", "2020-08-26T01:02:03.123Z"), + "base7", ImmutableMap.of("level1", ImmutableMap.of("f6", "base5Level1F6Value")))); + } + private static byte[] toParquetEncoding(LocalDateTime time) { long timeOfDayNanos = (long) time.getNano() + (time.toEpochSecond(UTC) - time.toLocalDate().atStartOfDay().toEpochSecond(UTC)) * 1_000_000_000; @@ -202,4 +319,9 @@ static byte[] getDoubleByteArray(double d) { return ByteBuffer.allocate(8).order(LITTLE_ENDIAN).putDouble(d).array(); } + + static byte[] getBooleanByteArray(boolean b) + { + return ByteBuffer.allocate(1).put((byte) (b ? 1 : 0)).array(); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/BenchmarkExtendedStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/BenchmarkExtendedStatistics.java index d54c931b85af..d7c7b158c130 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/BenchmarkExtendedStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/BenchmarkExtendedStatistics.java @@ -127,7 +127,7 @@ public long benchmark(BenchmarkData benchmarkData) DeltaLakeColumnHandle column = benchmarkData.columns.get(benchmarkData.random.nextInt(benchmarkData.columnsCount)); result += (long) statistics.getMaxColumnValue(column).get(); result += (long) statistics.getMinColumnValue(column).get(); - result += statistics.getNullCount(column.getBaseColumnName()).get(); + result += statistics.getNullCount(column).get(); } } return result; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java index 6046b41a97f3..13d25be94783 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java @@ -14,11 +14,13 @@ package io.trino.plugin.deltalake.transactionlog.statistics; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.airlift.json.ObjectMapperProvider; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.local.LocalInputFile; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import io.trino.plugin.deltalake.DeltaLakeColumnProjectionInfo; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; @@ -163,6 +165,9 @@ private static void testStatisticsValues(DeltaLakeFileStatistics fileStatistics) assertEquals( fileStatistics.getMinColumnValue(new DeltaLakeColumnHandle("row", rowType, OptionalInt.empty(), "row", rowType, REGULAR, Optional.empty())), Optional.empty()); + assertEquals( + fileStatistics.getMinColumnValue(new DeltaLakeColumnHandle("row", rowType, OptionalInt.empty(), "row", rowType, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(INTEGER, ImmutableList.of(0), ImmutableList.of("s1"))))), + Optional.of(1L)); assertEquals( fileStatistics.getMinColumnValue(new DeltaLakeColumnHandle("arr", new ArrayType(INTEGER), OptionalInt.empty(), "arr", new ArrayType(INTEGER), REGULAR, Optional.empty())), Optional.empty());