diff --git a/docs/src/main/sphinx/connector/delta-lake.md b/docs/src/main/sphinx/connector/delta-lake.md index bf0de331f895..a0b8943180da 100644 --- a/docs/src/main/sphinx/connector/delta-lake.md +++ b/docs/src/main/sphinx/connector/delta-lake.md @@ -763,6 +763,51 @@ The output of the query has the following history columns: - Whether or not the operation appended data ::: +(delta-lake-partitions-table)= + +##### `$partitions` table + +The `$partitions` table provides a detailed overview of the partitions of the +Delta Lake table. + +You can retrieve the information about the partitions of the Delta Lake table +`test_table` by using the following query: + +``` +SELECT * FROM "test_table$partitions" +``` + +```text + partition | file_count | total_size | data | +-------------------------------+------------+------------+----------------------------------------------+ +{_bigint=1, _date=2021-01-12} | 2 | 884 | {_decimal={min=1.0, max=2.0, null_count=0}} | +{_bigint=1, _date=2021-01-13} | 1 | 442 | {_decimal={min=1.0, max=1.0, null_count=0}} | +``` + +The output of the query has the following columns: + +:::{list-table} Partitions columns +:widths: 20, 30, 50 +:header-rows: 1 + +* - Name + - Type + - Description +* - `partition` + - `ROW(...)` + - A row that contains the mapping of the partition column names to the + partition column values. +* - `file_count` + - `BIGINT` + - The number of files mapped in the partition. +* - `total_size` + - `BIGINT` + - The size of all the files in the partition. +* - `data` + - `ROW(... ROW (min ..., max ... , null_count BIGINT))` + - Partition range and null counts. +::: + ##### `$properties` table The `$properties` table provides access to Delta Lake table configuration, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index f69a4fc9008b..788c0c973734 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -3690,10 +3690,10 @@ private static boolean isFileCreatedByQuery(Location file, String queryId) @Override public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { - return getRawSystemTable(tableName).map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); + return getRawSystemTable(session, tableName).map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); } - private Optional getRawSystemTable(SchemaTableName systemTableName) + private Optional getRawSystemTable(ConnectorSession session, SchemaTableName systemTableName) { Optional tableType = DeltaLakeTableName.tableTypeFrom(systemTableName.getTableName()); if (tableType.isEmpty() || tableType.get() == DeltaLakeTableType.DATA) { @@ -3723,6 +3723,7 @@ private Optional getRawSystemTable(SchemaTableName systemTableName) transactionLogAccess, typeManager)); case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(systemTableName, tableLocation, transactionLogAccess)); + case PARTITIONS -> Optional.of(new DeltaLakePartitionsTable(session, systemTableName, tableLocation, transactionLogAccess, typeManager)); }; } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePartitionsTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePartitionsTable.java new file mode 100644 index 000000000000..1b8717fdd1fe --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePartitionsTable.java @@ -0,0 +1,453 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; +import io.trino.plugin.deltalake.transactionlog.TableSnapshot; +import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.util.PageListBuilder; +import io.trino.spi.Page; +import io.trino.spi.TrinoException; +import io.trino.spi.block.SqlRow; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.EmptyPageSource; +import io.trino.spi.connector.FixedPageSource; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.RowType; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.function.Function; +import java.util.stream.Stream; + +import static com.google.common.base.Predicates.alwaysTrue; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; +import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.block.RowValueBuilder.buildRowValue; +import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL; +import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL; +import static io.trino.spi.function.InvocationConvention.simpleConvention; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.TypeUtils.writeNativeValue; +import static java.util.Objects.requireNonNull; + +public class DeltaLakePartitionsTable + implements SystemTable +{ + private final TableSnapshot tableSnapshot; + private final TransactionLogAccess transactionLogAccess; + private final TypeManager typeManager; + private final MetadataEntry metadataEntry; + private final ProtocolEntry protocolEntry; + private final ConnectorTableMetadata tableMetadata; + private final List schema; + private final List partitionColumns; + private final List partitionFields; + private final List regularColumns; + private final Optional dataColumnType; + private final List columnMetricTypes; + + public DeltaLakePartitionsTable( + ConnectorSession session, + SchemaTableName tableName, + String tableLocation, + TransactionLogAccess transactionLogAccess, + TypeManager typeManager) + { + requireNonNull(tableName, "tableName is null"); + requireNonNull(tableLocation, "tableLocation is null"); + this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + + try { + this.tableSnapshot = transactionLogAccess.loadSnapshot(session, tableName, tableLocation, Optional.empty()); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting snapshot from location: " + tableLocation, e); + } + + this.metadataEntry = transactionLogAccess.getMetadataEntry(session, tableSnapshot); + this.protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); + this.schema = extractSchema(metadataEntry, protocolEntry, typeManager); + + this.partitionColumns = getPartitionColumns(); + this.partitionFields = this.partitionColumns.stream() + .map(column -> RowType.field(column.getBaseColumnName(), column.getType())) + .collect(toImmutableList()); + + this.regularColumns = getColumns().stream() + .filter(column -> column.getColumnType() == REGULAR) + .collect(toImmutableList()); + this.dataColumnType = getMetricsColumnType(regularColumns); + + ImmutableList.Builder columnMetadataBuilder = ImmutableList.builder(); + + if (!this.partitionFields.isEmpty()) { + columnMetadataBuilder.add(new ColumnMetadata("partition", RowType.from(this.partitionFields))); + } + + columnMetadataBuilder.add(new ColumnMetadata("file_count", BIGINT)); + columnMetadataBuilder.add(new ColumnMetadata("total_size", BIGINT)); + + if (dataColumnType.isPresent()) { + columnMetadataBuilder.add(new ColumnMetadata("data", dataColumnType.get())); + this.columnMetricTypes = dataColumnType.get().getFields().stream() + .map(RowType.Field::getType) + .map(RowType.class::cast) + .collect(toImmutableList()); + } + else { + this.columnMetricTypes = ImmutableList.of(); + } + + this.tableMetadata = new ConnectorTableMetadata(tableName, columnMetadataBuilder.build()); + } + + @Override + public Distribution getDistribution() + { + return Distribution.SINGLE_COORDINATOR; + } + + @Override + public ConnectorTableMetadata getTableMetadata() + { + return tableMetadata; + } + + @Override + public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + { + if (partitionColumns.isEmpty()) { + return new EmptyPageSource(); + } + + return new FixedPageSource(buildPages(session)); + } + + private List buildPages(ConnectorSession session) + { + PageListBuilder pageListBuilder = PageListBuilder.forTable(tableMetadata); + + Map>, DeltaLakePartitionStatistics> statisticsByPartition; + try (Stream activeFiles = transactionLogAccess.loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), alwaysTrue())) { + statisticsByPartition = getStatisticsByPartition(activeFiles); + } + + for (Map.Entry>, DeltaLakePartitionStatistics> partitionEntry : statisticsByPartition.entrySet()) { + Map> partitionValue = partitionEntry.getKey(); + DeltaLakePartitionStatistics deltaLakePartitionStatistics = partitionEntry.getValue(); + + RowType partitionValuesRowType = RowType.from(partitionFields); + SqlRow partitionValuesRow = buildRowValue(partitionValuesRowType, fields -> { + for (int i = 0; i < partitionColumns.size(); i++) { + DeltaLakeColumnHandle column = partitionColumns.get(i); + Type type = column.getType(); + Optional value = partitionValue.get(column.getBasePhysicalColumnName()); + Object deserializedPartitionValue = deserializePartitionValue(column, value); + writeNativeValue(type, fields.get(i), deserializedPartitionValue); + } + }); + + pageListBuilder.beginRow(); + + pageListBuilder.appendNativeValue(partitionValuesRowType, partitionValuesRow); + pageListBuilder.appendBigint(deltaLakePartitionStatistics.fileCount()); + pageListBuilder.appendBigint(deltaLakePartitionStatistics.size()); + + dataColumnType.ifPresent(dataColumnType -> { + SqlRow dataColumnRow = buildRowValue(dataColumnType, fields -> { + for (int i = 0; i < columnMetricTypes.size(); i++) { + String fieldName = regularColumns.get(i).getBaseColumnName(); + Object min = deltaLakePartitionStatistics.minValues().getOrDefault(fieldName, null); + Object max = deltaLakePartitionStatistics.maxValues().getOrDefault(fieldName, null); + Long nullCount = deltaLakePartitionStatistics.nullCounts().getOrDefault(fieldName, null); + RowType columnMetricType = columnMetricTypes.get(i); + columnMetricType.writeObject(fields.get(i), getColumnMetricBlock(columnMetricType, min, max, nullCount)); + } + }); + pageListBuilder.appendNativeValue(dataColumnType, dataColumnRow); + }); + + pageListBuilder.endRow(); + } + + return pageListBuilder.build(); + } + + private Map>, DeltaLakePartitionStatistics> getStatisticsByPartition(Stream addFileEntryStream) + { + Map>, DeltaLakePartitionStatistics.Builder> partitionValueStatistics = new HashMap<>(); + + addFileEntryStream.forEach(addFileEntry -> { + Map> partitionValues = addFileEntry.getCanonicalPartitionValues(); + partitionValueStatistics.computeIfAbsent(partitionValues, key -> new DeltaLakePartitionStatistics.Builder(regularColumns, typeManager)) + .acceptAddFileEntry(addFileEntry); + }); + + return partitionValueStatistics.entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().build())); + } + + private List getPartitionColumns() + { + // the list of partition columns returned maintains the ordering of partitioned_by + Map columnsMetadataByName = schema.stream() + .collect(toImmutableMap(DeltaLakeColumnMetadata::name, Function.identity())); + return metadataEntry.getOriginalPartitionColumns().stream() + .map(partitionColumnName -> { + DeltaLakeColumnMetadata columnMetadata = columnsMetadataByName.get(partitionColumnName); + return new DeltaLakeColumnHandle( + columnMetadata.name(), + columnMetadata.type(), OptionalInt.empty(), + columnMetadata.physicalName(), + columnMetadata.physicalColumnType(), PARTITION_KEY, Optional.empty()); + }) + .collect(toImmutableList()); + } + + private List getColumns() + { + return schema.stream() + .map(column -> { + boolean isPartitionKey = metadataEntry.getOriginalPartitionColumns().contains(column.name()); + return new DeltaLakeColumnHandle( + column.name(), + column.type(), + column.fieldId(), + column.physicalName(), + column.physicalColumnType(), + isPartitionKey ? PARTITION_KEY : REGULAR, + Optional.empty()); + }) + .collect(toImmutableList()); + } + + private static SqlRow getColumnMetricBlock(RowType columnMetricType, Object min, Object max, Long nullCount) + { + return buildRowValue(columnMetricType, fieldBuilders -> { + List fields = columnMetricType.getFields(); + writeNativeValue(fields.get(0).getType(), fieldBuilders.get(0), min); + writeNativeValue(fields.get(1).getType(), fieldBuilders.get(1), max); + writeNativeValue(fields.get(2).getType(), fieldBuilders.get(2), nullCount); + }); + } + + private static Optional getMetricsColumnType(List columns) + { + List metricColumns = columns.stream() + .map(column -> RowType.field( + column.getBaseColumnName(), + RowType.from(ImmutableList.of( + new RowType.Field(Optional.of("min"), column.getType()), + new RowType.Field(Optional.of("max"), column.getType()), + new RowType.Field(Optional.of("null_count"), BIGINT))))) + .collect(toImmutableList()); + if (metricColumns.isEmpty()) { + return Optional.empty(); + } + return Optional.of(RowType.from(metricColumns)); + } + + private record DeltaLakePartitionStatistics(long fileCount, long size, Map minValues, Map maxValues, Map nullCounts) + { + private DeltaLakePartitionStatistics + { + minValues = ImmutableMap.copyOf(requireNonNull(minValues, "minValues is null")); + maxValues = ImmutableMap.copyOf(requireNonNull(maxValues, "maxValues is null")); + nullCounts = ImmutableMap.copyOf(requireNonNull(nullCounts, "nullCounts is null")); + } + + private static class Builder + { + private final List columns; + private final TypeManager typeManager; + private long fileCount; + private long size; + private final Map columnStatistics = new HashMap<>(); + private final Map nullCounts = new HashMap<>(); + private boolean ignoreDataColumn; + + public Builder(List columns, TypeManager typeManager) + { + this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + public void acceptAddFileEntry(AddFileEntry addFileEntry) + { + // skipping because entry is deleted in the presence of deletion vector + if (addFileEntry.getDeletionVector().isPresent()) { + return; + } + + fileCount++; + size += addFileEntry.getSize(); + + if (ignoreDataColumn) { + return; + } + + addFileEntry.getStats().ifPresentOrElse(stats -> { + for (DeltaLakeColumnHandle column : columns) { + updateMinMaxStats( + column.getBaseColumnName(), + column.getType(), + stats.getMinColumnValue(column).orElse(null), + stats.getMaxColumnValue(column).orElse(null), + stats.getNumRecords().orElse(0L)); + updateNullCountStats(column.getBaseColumnName(), stats.getNullCount(column.getBasePhysicalColumnName()).orElse(null)); + } + }, () -> { + columnStatistics.clear(); + nullCounts.clear(); + ignoreDataColumn = true; + }); + } + + public DeltaLakePartitionStatistics build() + { + ImmutableMap.Builder minValues = ImmutableMap.builder(); + ImmutableMap.Builder maxValues = ImmutableMap.builder(); + + columnStatistics.forEach((key, statistics) -> { + statistics.getMin().ifPresent(min -> minValues.put(key, min)); + statistics.getMax().ifPresent(max -> maxValues.put(key, max)); + }); + + return new DeltaLakePartitionStatistics(fileCount, size, minValues.buildOrThrow(), maxValues.buildOrThrow(), ImmutableMap.copyOf(nullCounts)); + } + + private void updateNullCountStats(String key, Long nullCount) + { + if (nullCount != null) { + nullCounts.merge(key, nullCount, Long::sum); + } + } + + private void updateMinMaxStats( + String key, + Type type, + Object lowerBound, + Object upperBound, + long recordCount) + { + if (type.isOrderable() && recordCount != 0L) { + // Capture the initial bounds during construction so there are always valid min/max values to compare to. This does make the first call to + // `ColumnStatistics#updateMinMax` a no-op. + columnStatistics.computeIfAbsent(key, ignored -> { + MethodHandle comparisonHandle = typeManager.getTypeOperators() + .getComparisonUnorderedLastOperator(type, simpleConvention(FAIL_ON_NULL, NEVER_NULL, NEVER_NULL)); + return new ColumnStatistics(comparisonHandle, lowerBound, upperBound); + }).updateMinMax(lowerBound, upperBound); + } + } + + private static class ColumnStatistics + { + private final MethodHandle comparisonHandle; + + private Optional min; + private Optional max; + + public ColumnStatistics(MethodHandle comparisonHandle, Object initialMin, Object initialMax) + { + this.comparisonHandle = requireNonNull(comparisonHandle, "comparisonHandle is null"); + this.min = Optional.ofNullable(initialMin); + this.max = Optional.ofNullable(initialMax); + } + + /** + * Gets the minimum value accumulated during stats collection. + * + * @return Empty if the statistics contained values which were not comparable, otherwise returns the min value. + */ + public Optional getMin() + { + return min; + } + + /** + * Gets the maximum value accumulated during stats collection. + * + * @return Empty if the statistics contained values which were not comparable, otherwise returns the max value. + */ + public Optional getMax() + { + return max; + } + + /** + * Update the stats, as long as they haven't already been invalidated + * + * @param lowerBound Trino encoded lower bound value from a file + * @param upperBound Trino encoded upper bound value from a file + */ + public void updateMinMax(Object lowerBound, Object upperBound) + { + if (min.isPresent()) { + if (lowerBound == null) { + min = Optional.empty(); + } + else if (compareTrinoValue(lowerBound, min.get()) < 0) { + min = Optional.of(lowerBound); + } + } + + if (max.isPresent()) { + if (upperBound == null) { + max = Optional.empty(); + } + else if (compareTrinoValue(upperBound, max.get()) > 0) { + max = Optional.of(upperBound); + } + } + } + + private long compareTrinoValue(Object value, Object otherValue) + { + try { + return (Long) comparisonHandle.invoke(value, otherValue); + } + catch (Throwable throwable) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unable to compare Delta min/max values", throwable); + } + } + } + } + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java index 595e5230ac8c..d1fac99083fa 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java @@ -18,4 +18,5 @@ public enum DeltaLakeTableType DATA, HISTORY, PROPERTIES, + PARTITIONS, } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 475e2c623313..3a3d0db701e1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -382,7 +382,7 @@ public Stream getActiveFiles( } } - private Stream loadActiveFiles( + public Stream loadActiveFiles( ConnectorSession session, TableSnapshot tableSnapshot, MetadataEntry metadataEntry, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/PageListBuilder.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/PageListBuilder.java index 523d72040879..505d6b97eb0e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/PageListBuilder.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/PageListBuilder.java @@ -32,6 +32,7 @@ import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.TypeUtils.writeNativeValue; import static io.trino.spi.type.VarcharType.VARCHAR; public final class PageListBuilder @@ -127,6 +128,11 @@ public void appendVarcharVarcharMap(Map values) })); } + public void appendNativeValue(Type type, Object object) + { + writeNativeValue(type, nextColumn(), object); + } + public BlockBuilder nextColumn() { int currentChannel = channel; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreWithTableRedirectionsTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreWithTableRedirectionsTest.java index 1bc9f0cddcc5..6a000a020ffe 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreWithTableRedirectionsTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreWithTableRedirectionsTest.java @@ -95,4 +95,11 @@ public void testPropertiesTable() assertThat(query("SELECT * FROM delta_with_redirections." + schema + ".\"delta_table$properties\"")) .matches("SELECT * FROM hive_with_redirections." + schema + ".\"delta_table$properties\""); } + + @Test + public void testPartitionsTable() + { + assertThat(query("SELECT * FROM delta_with_redirections." + schema + ".\"delta_table$partitions\"")) + .matches("SELECT * FROM hive_with_redirections." + schema + ".\"delta_table$partitions\""); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 7bdd52f5ecda..55a612fd737b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -978,6 +978,7 @@ private void testCorruptedTableLocation(String tableName, Path tableLocation, bo assertQueryFails("TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName); assertQueryFails("SELECT * FROM \"" + tableName + "$history\"", "Metadata not found in transaction log for tpch." + tableName); assertQueryFails("SELECT * FROM \"" + tableName + "$properties\"", "Metadata not found in transaction log for tpch." + tableName); + assertQueryFails("SELECT * FROM \"" + tableName + "$partitions\"", "Metadata not found in transaction log for tpch." + tableName + "\\$partitions"); assertQueryFails("SELECT * FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName); assertQueryFails("SELECT 1 FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName); assertQueryFails("SHOW CREATE TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePartitioning.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePartitioning.java index c052ddb4e370..aa03e0929781 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePartitioning.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePartitioning.java @@ -21,6 +21,7 @@ import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @TestInstance(PER_CLASS) @@ -139,11 +140,39 @@ public void testReadAllTypes() } @Test - public void testPartitionsSystemTableDoesNotExist() + public void testReadAllTypesPartitionsSystemTable() { - assertQueryFails( - "SELECT * FROM \"partitions$partitions\"", - ".*'delta\\.tpch\\.\"partitions\\$partitions\"' does not exist"); + assertThat( + query("SELECT " + + "partition.p_string, " + + "partition.p_byte, " + + "partition.p_short, " + + "partition.p_int, " + + "partition.p_long, " + + "partition.p_decimal, " + + "partition.p_boolean, " + + "partition.p_float, " + + "partition.p_double, " + + "partition.p_date, " + + "partition.p_timestamp, " + + "file_count, " + + "total_size " + + "FROM \"partitions$partitions\" ")) + .matches("VALUES (" + + "VARCHAR 'Alice', " + + "TINYINT '123', " + + "SMALLINT '12345', " + + "123456789, " + + "1234567890123456789, " + + "12345678901234567890.123456789012345678, " + + "true, " + + "REAL '3.1415927', " + + "DOUBLE '3.141592653589793', " + + "DATE '2014-01-01', " + + "TIMESTAMP '2014-01-01 23:00:01.123 UTC', " + + "BIGINT '30', " + + "BIGINT '136080' " + + ")"); } @Test diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index 97c8dc48f25e..9975bafb0ea4 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -13,10 +13,19 @@ */ package io.trino.plugin.deltalake; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; + +import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents; +import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; public class TestDeltaLakeSystemTables @@ -27,6 +36,7 @@ protected QueryRunner createQueryRunner() throws Exception { return DeltaLakeQueryRunner.builder() + .addDeltaProperty("delta.register-table-procedure.enabled", "true") .addDeltaProperty("delta.enable-non-concurrent-writes", "true") .build(); } @@ -99,4 +109,379 @@ public void testPropertiesTable() assertUpdate("DROP TABLE IF EXISTS " + tableName); } } + + @Test + public void testPartitionsTable() + { + String tableName = "test_simple_partitions_table_" + randomNameSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(_bigint BIGINT, _date DATE) WITH (partitioned_by = ARRAY['_date'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (6, NULL)", 1); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 7"); + assertQuery("SELECT count(*) FROM \"" + tableName + "$partitions\"", "VALUES 4"); + + assertQuery("SHOW COLUMNS FROM \"" + tableName + "$partitions\"", + """ + VALUES + ('partition', 'row(_date date)', '', ''), + ('file_count', 'bigint', '', ''), + ('total_size', 'bigint', '', ''), + ('data', 'row(_bigint row(min bigint, max bigint, null_count bigint))', '', '') + """); + + assertQuery("SELECT partition._date FROM \"" + tableName + "$partitions\"", " VALUES DATE '2019-09-08', DATE '2019-09-09', DATE '2019-09-10', NULL"); + + assertThat(query("SELECT CAST(data._bigint AS ROW(BIGINT, BIGINT, BIGINT)) FROM \"" + tableName + "$partitions\"")) + .matches(""" + VALUES + ROW(ROW(BIGINT '0', BIGINT '0', BIGINT '0')), + ROW(ROW(BIGINT '1', BIGINT '3', BIGINT '0')), + ROW(ROW(BIGINT '4', BIGINT '5', BIGINT '0')), + ROW(ROW(BIGINT '6', BIGINT '6', BIGINT '0')) + """); + + assertUpdate("INSERT INTO " + tableName + " VALUES (NULL, CAST('2019-09-09' AS DATE))", 1); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 8"); + assertThat(query("SELECT CAST(data._bigint AS ROW(BIGINT, BIGINT, BIGINT)) FROM \"" + tableName + "$partitions\"")) + .matches(""" + VALUES + ROW(ROW(BIGINT '0', BIGINT '0', BIGINT '0')), + ROW(ROW(BIGINT '4', BIGINT '5', BIGINT '0')), + ROW(ROW(BIGINT '6', BIGINT '6', BIGINT '0')), + ROW(ROW(NULL, NULL, BIGINT '1')) + """); + } + finally{ + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + /** + * @see databricks133.partition_values_parsed_case_sensitive + */ + @Test + public void testPartitionsTableCaseSensitiveColumns() + throws Exception + { + String tableName = "test_partitions_table_case_sensitive_columns_" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource("databricks133/partition_values_parsed_case_sensitive").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 3"); + assertQuery("SELECT * FROM " + tableName, "VALUES (100, 1, 'ala'), (200, 2, 'kota'), (300, 3, 'osla')"); + + assertQuery("SELECT count(*) FROM \"" + tableName + "$partitions\"", "VALUES 3"); + + assertQuery("SHOW COLUMNS FROM \"" + tableName + "$partitions\"", + """ + VALUES + ('partition', 'row(part_NuMbEr integer, part_StRiNg varchar)', '', ''), + ('file_count', 'bigint', '', ''), + ('total_size', 'bigint', '', ''), + ('data', 'row(id row(min integer, max integer, null_count bigint))', '', '') + """); + + assertQuery("SELECT partition.part_NuMbEr, partition.part_StRiNg FROM \"" + tableName + "$partitions\"", "VALUES (1, 'ala'), (2, 'kota'), (3, 'osla')"); + + assertThat(query("SELECT CAST(data.id AS ROW(INTEGER, INTEGER, BIGINT)) FROM \"" + tableName + "$partitions\"")) + .matches("VALUES ROW(ROW(100, 100, BIGINT '0')), ROW(ROW(200, 200, BIGINT '0')), ROW(ROW(300, 300, BIGINT '0'))"); + + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 1, 'ala'), (2, 2, 'kota'), (3, 3, 'osla')", 3); + assertThat(query("SELECT CAST(data.id AS ROW(INTEGER, INTEGER, BIGINT)) FROM \"" + tableName + "$partitions\"")) + .matches("VALUES ROW(ROW(1, 100, BIGINT '0')), ROW(ROW(2, 200, BIGINT '0')), ROW(ROW(3, 300, BIGINT '0'))"); + } + + @Test + public void testColumnMappingModePartitionsTable() + { + for (String columnMappingMode : ImmutableList.of("id", "name", "none")) { + testColumnMappingModePartitionsTable(columnMappingMode); + } + } + + private void testColumnMappingModePartitionsTable(String columnMappingMode) + { + String tableName = "test_simple_column_mapping_mode_" + columnMappingMode + "_partitions_table_" + randomNameSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(_bigint BIGINT, _date DATE) WITH (column_mapping_mode = '" + columnMappingMode + "', partitioned_by = ARRAY['_date'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (6, NULL), (NULL, CAST('2019-09-08' AS DATE)), (NULL, CAST('2019-09-08' AS DATE))", 3); + + assertQuerySucceeds("SELECT * FROM " + tableName); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 9"); + assertQuery("SELECT count(*) FROM \"" + tableName + "$partitions\"", "VALUES 4"); + + assertQuery("SHOW COLUMNS FROM \"" + tableName + "$partitions\"", + """ + VALUES + ('partition', 'row(_date date)', '', ''), + ('file_count', 'bigint', '', ''), + ('total_size', 'bigint', '', ''), + ('data', 'row(_bigint row(min bigint, max bigint, null_count bigint))', '', '') + """); + + assertQuery("SELECT partition._date FROM \"" + tableName + "$partitions\"", " VALUES DATE '2019-09-08', DATE '2019-09-09', DATE '2019-09-10', NULL"); + + assertThat(query("SELECT CAST(data._bigint AS ROW(BIGINT, BIGINT, BIGINT)) FROM \"" + tableName + "$partitions\"")) + .matches(""" + VALUES + ROW(ROW(BIGINT '1', BIGINT '3', BIGINT '0')), + ROW(ROW(BIGINT '4', BIGINT '5', BIGINT '0')), + ROW(ROW(BIGINT '6', BIGINT '6', BIGINT '0')), + ROW(ROW(CAST(NULL AS BIGINT), CAST(NULL AS BIGINT), BIGINT '2')) + """); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testPartitionsTableMultipleColumns() + { + String tableName = "test_partitions_table_multiple_columns_" + randomNameSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(_bigint BIGINT, _date DATE, _varchar VARCHAR) WITH (partitioned_by = ARRAY['_date', '_varchar'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (0, CAST('2019-09-08' AS DATE), 'a'), (1, CAST('2019-09-09' AS DATE), 'b'), (2, CAST('2019-09-09' AS DATE), 'c')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, CAST('2019-09-09' AS DATE), 'd'), (4, CAST('2019-09-10' AS DATE), 'e'), (5, CAST('2019-09-10' AS DATE), 'f'), (4, CAST('2019-09-10' AS DATE), 'f')", 4); + assertUpdate("INSERT INTO " + tableName + " VALUES (6, null, 'g'), (6, CAST('2019-09-10' AS DATE), null), (7, null, null), (8, null, 'g')", 4); + assertUpdate("UPDATE " + tableName + " SET _bigint = 50 WHERE _bigint = BIGINT '5'", 1); + assertUpdate("DELETE FROM " + tableName + " WHERE _date = DATE '2019-09-08'", 1); + assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 10"); + assertQuery("SELECT count(*) FROM \"" + tableName + "$partitions\"", "VALUES 8"); + assertQuery("SELECT count(partition._varchar) FROM \"" + tableName + "$partitions\"", "VALUES 6"); + assertQuery("SELECT count(distinct partition._date) FROM \"" + tableName + "$partitions\"", "VALUES 2"); + + assertQuery("SHOW COLUMNS FROM \"" + tableName + "$partitions\"", + """ + VALUES + ('partition', 'row(_date date, _varchar varchar)', '', ''), + ('file_count', 'bigint', '', ''), + ('total_size', 'bigint', '', ''), + ('data', 'row(_bigint row(min bigint, max bigint, null_count bigint))', '', '') + """); + + assertQuery( + "SELECT partition._date, partition._varchar FROM \"" + tableName + "$partitions\"", + """ + VALUES + (DATE '2019-09-09', 'b'), + (DATE '2019-09-09', 'c'), + (DATE '2019-09-09', 'd'), + (DATE '2019-09-10', 'e'), + (DATE '2019-09-10', 'f'), + (DATE '2019-09-10', null), + (null, 'g'), + (null, null) + """); + + assertThat(query("SELECT CAST(data._bigint AS ROW(BIGINT, BIGINT, BIGINT)) FROM \"" + tableName + "$partitions\"")) + .matches(""" + VALUES + ROW(ROW(BIGINT '1', BIGINT '1', BIGINT '0')), + ROW(ROW(BIGINT '2', BIGINT '2', BIGINT '0')), + ROW(ROW(BIGINT '3', BIGINT '3', BIGINT '0')), + ROW(ROW(BIGINT '4', BIGINT '4', BIGINT '0')), + ROW(ROW(BIGINT '4', BIGINT '50', BIGINT '0')), + ROW(ROW(BIGINT '6', BIGINT '6', BIGINT '0')), + ROW(ROW(BIGINT '6', BIGINT '8', BIGINT '0')), + ROW(ROW(BIGINT '7', BIGINT '7', BIGINT '0')) + """); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testPartitionsTableDifferentOrderFromDefinitionMultipleColumns() + { + String tableName = "test_partitions_table_different_order_from_definition_multiple_columns_" + randomNameSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(_bigint BIGINT, _date DATE, _varchar VARCHAR) WITH (partitioned_by = ARRAY['_varchar', '_date'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (0, CAST('2019-09-08' AS DATE), 'a'), (1, CAST('2019-09-09' AS DATE), 'b'), (2, CAST('2019-09-09' AS DATE), 'c')", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, CAST('2019-09-09' AS DATE), 'd'), (4, CAST('2019-09-10' AS DATE), 'e'), (5, CAST('2019-09-10' AS DATE), 'f'), (4, CAST('2019-09-10' AS DATE), 'f')", 4); + assertUpdate("INSERT INTO " + tableName + " VALUES (6, null, 'g'), (6, CAST('2019-09-10' AS DATE), null), (7, null, null), (8, null, 'g')", 4); + assertUpdate("UPDATE " + tableName + " SET _bigint = 50 WHERE _bigint = BIGINT '5'", 1); + assertUpdate("DELETE FROM " + tableName + " WHERE _date = DATE '2019-09-08'", 1); + assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 10"); + assertQuery("SELECT count(*) FROM \"" + tableName + "$partitions\"", "VALUES 8"); + assertQuery("SELECT count(partition._varchar) FROM \"" + tableName + "$partitions\"", "VALUES 6"); + assertQuery("SELECT count(distinct partition._date) FROM \"" + tableName + "$partitions\"", "VALUES 2"); + + assertQuery("SHOW COLUMNS FROM \"" + tableName + "$partitions\"", + """ + VALUES + ('partition', 'row(_varchar varchar, _date date)', '', ''), + ('file_count', 'bigint', '', ''), + ('total_size', 'bigint', '', ''), + ('data', 'row(_bigint row(min bigint, max bigint, null_count bigint))', '', '') + """); + + assertQuery( + "SELECT partition._varchar, partition._date FROM \"" + tableName + "$partitions\"", + """ + VALUES + ('b', DATE '2019-09-09'), + ('c', DATE '2019-09-09'), + ('d', DATE '2019-09-09'), + ('e', DATE '2019-09-10'), + ('f', DATE '2019-09-10'), + (null, DATE '2019-09-10'), + ('g', null), + (null, null) + """); + + assertThat(query("SELECT CAST(data._bigint AS ROW(BIGINT, BIGINT, BIGINT)) FROM \"" + tableName + "$partitions\"")) + .matches(""" + VALUES + ROW(ROW(BIGINT '1', BIGINT '1', BIGINT '0')), + ROW(ROW(BIGINT '2', BIGINT '2', BIGINT '0')), + ROW(ROW(BIGINT '3', BIGINT '3', BIGINT '0')), + ROW(ROW(BIGINT '4', BIGINT '4', BIGINT '0')), + ROW(ROW(BIGINT '4', BIGINT '50', BIGINT '0')), + ROW(ROW(BIGINT '6', BIGINT '6', BIGINT '0')), + ROW(ROW(BIGINT '6', BIGINT '8', BIGINT '0')), + ROW(ROW(BIGINT '7', BIGINT '7', BIGINT '0')) + """); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testPartitionsTableColumnTypes() + { + // TODO: add support for BOOLEAN, TIMESTAMP, and VARBINARY column types https://github.com/trinodb/trino/issues/21878 + testPartitionsTableColumnTypes("BOOLEAN", "VALUES (true, 'a'), (false, 'a'), (false, 'b'), (false, 'b')", 4, """ + VALUES + ROW(ROW(CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN), BIGINT '0')), + ROW(ROW(CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN), BIGINT '0')) + """); + testPartitionsTableColumnTypes("INTEGER", "VALUES (3, 'a'), (6, 'a'), (0, 'b'), (9, 'b')", 4, """ + VALUES + ROW(ROW(0, 9, BIGINT '0')), + ROW(ROW(3, 6, BIGINT '0')) + """); + testPartitionsTableColumnTypes("TINYINT", "VALUES (3, 'a'), (6, 'a'), (0, 'b'), (9, 'b')", 4, """ + VALUES + ROW(ROW(TINYINT '0', TINYINT '9', BIGINT '0')), + ROW(ROW(TINYINT '3', TINYINT '6', BIGINT '0')) + """); + testPartitionsTableColumnTypes("SMALLINT", "VALUES (3, 'a'), (6, 'a'), (0, 'b'), (9, 'b')", 4, """ + VALUES + ROW(ROW(SMALLINT '0', SMALLINT '9', BIGINT '0')), + ROW(ROW(SMALLINT '3', SMALLINT '6', BIGINT '0')) + """); + testPartitionsTableColumnTypes("BIGINT", "VALUES (3, 'a'), (6, 'a'), (0, 'b'), (9, 'b')", 4, """ + VALUES + ROW(ROW(BIGINT '0', BIGINT '9', BIGINT '0')), + ROW(ROW(BIGINT '3', BIGINT '6', BIGINT '0')) + """); + testPartitionsTableColumnTypes("REAL", "VALUES (10.3, 'a'), (15.7, 'a'), (3.2, 'b'), (6.1, 'b')", 4, """ + VALUES + ROW(ROW(REAL '3.2', REAL '6.1', BIGINT '0')), + ROW(ROW(REAL '10.3', REAL '15.7', BIGINT '0')) + """); + testPartitionsTableColumnTypes("DOUBLE", "VALUES (3.2, 'a'), (7.25, 'a'), (7.25, 'b'), (18.9382, 'b')", 4, """ + VALUES + ROW(ROW(DOUBLE '3.2', DOUBLE '7.25', BIGINT '0')), + ROW(ROW(DOUBLE '7.25', DOUBLE '18.9382', BIGINT '0')) + """); + testPartitionsTableColumnTypes("DECIMAL(10)", "VALUES (5.6, 'a'), (1.2, 'a'), (532.62, 'b'), (153.27, 'b')", 4, """ + VALUES + ROW(ROW(CAST(1.2 AS DECIMAL(10)), CAST(5.6 AS DECIMAL(10)), BIGINT '0')), + ROW(ROW(CAST(153.27 AS DECIMAL(10)), CAST(532.62 AS DECIMAL(10)), BIGINT '0')) + """); + testPartitionsTableColumnTypes("DECIMAL(20)", "VALUES (0.64525495002404036507, 'a'), (0.77003757467454995626, 'a'), (0.05016312397354421814, 'b'), (0.69575427222174470843, 'b')", 4, """ + VALUES + ROW(ROW(CAST(0.05016312397354421814 AS DECIMAL(20)), CAST(0.69575427222174470843 AS DECIMAL(20)), BIGINT '0')), + ROW(ROW(CAST(0.64525495002404036507 AS DECIMAL(20)), CAST(0.77003757467454995626 AS DECIMAL(20)), BIGINT '0')) + """); + testPartitionsTableColumnTypes("DATE", "VALUES (CAST('2019-09-08' AS DATE), 'a'), (CAST('2020-09-08' AS DATE), 'a'), (CAST('2019-09-07' AS DATE), 'b'), (CAST('2019-09-08' AS DATE), 'b')", 4, """ + VALUES + ROW(ROW(DATE '2019-09-07', DATE '2019-09-08', BIGINT '0')), + ROW(ROW(DATE '2019-09-08', DATE '2020-09-08', BIGINT '0')) + """); + testPartitionsTableColumnTypes("TIMESTAMP(6)", "VALUES (TIMESTAMP '2001-05-06 12:34:56.123456', 'a'), (TIMESTAMP '2001-05-06 12:34:56.567890', 'a'), (TIMESTAMP '2001-05-06 12:34:56.123456', 'b'), (TIMESTAMP '2001-05-06 12:34:56.123457', 'b')", 4, """ + VALUES + ROW(ROW(CAST(NULL AS TIMESTAMP(6)), CAST(NULL AS TIMESTAMP(6)), BIGINT '0')), + ROW(ROW(CAST(NULL AS TIMESTAMP(6)), CAST(NULL AS TIMESTAMP(6)), BIGINT '0')) + """); + testPartitionsTableColumnTypes("TIMESTAMP(3) WITH TIME ZONE", "VALUES (TIMESTAMP '2001-05-06 12:34:56.123 UTC', 'a'), (TIMESTAMP '2001-05-06 12:34:56.234 -08:30', 'a'), (TIMESTAMP '2001-05-06 12:34:56.567 GMT-08:30', 'b'), (TIMESTAMP '2001-05-06 12:34:56.789 America/New_York', 'b')", 4, """ + VALUES + ROW(ROW(TIMESTAMP '2001-05-06 12:34:56.123 UTC', TIMESTAMP '2001-05-06 21:04:56.234 UTC', BIGINT '0')), + ROW(ROW(TIMESTAMP '2001-05-06 16:34:56.789 UTC', TIMESTAMP '2001-05-06 21:04:56.567 UTC', BIGINT '0')) + """); + testPartitionsTableColumnTypes("VARCHAR", "VALUES ('z', 'a'), ('x', 'a'), ('a', 'b'), ('b', 'b')", 4, """ + VALUES + ROW(ROW(CAST('a' AS VARCHAR), CAST('b' AS VARCHAR), BIGINT '0')), + ROW(ROW(CAST('x' AS VARCHAR), CAST('z' AS VARCHAR), BIGINT '0')) + """); + testPartitionsTableColumnTypes("VARBINARY", "VALUES (VARBINARY 'abcd', 'a'), (VARBINARY 'jkl', 'a'), (VARBINARY 'mno', 'b'), (VARBINARY 'xyzz', 'b')", 4, """ + VALUES + ROW(ROW(CAST(NULL AS VARBINARY), CAST(NULL AS VARBINARY), BIGINT '0')), + ROW(ROW(CAST(NULL AS VARBINARY), CAST(NULL AS VARBINARY), BIGINT '0')) + """); + testPartitionsTableColumnTypes("ARRAY(INTEGER)", "VALUES (ARRAY[3, 2, null, 5, null, 1, 2], 'a'), (ARRAY[null, 1, 3, 5, 7, 9, 11], 'a'), (ARRAY[7, 3, 2, 6, 5, 4, 3], 'b'), (ARRAY[2, 6, 3, 5, null, 1, 6], 'b')", 4, """ + VALUES + ROW(ROW(CAST(NULL AS ARRAY(INTEGER)), CAST(NULL AS ARRAY(INTEGER)), CAST(NULL AS BIGINT))), + ROW(ROW(CAST(NULL AS ARRAY(INTEGER)), CAST(NULL AS ARRAY(INTEGER)), CAST(NULL AS BIGINT))) + """); + testPartitionsTableColumnTypes("MAP(INTEGER, INTEGER)", "VALUES (MAP(ARRAY[1,3], ARRAY[2,4]), 'a'), (MAP(ARRAY[1,2], ARRAY[3,4]), 'a'), (MAP(ARRAY[8,3], ARRAY[7,4]), 'b'), (MAP(ARRAY[1,5], ARRAY[2,7]), 'b')", 4, """ + VALUES + ROW(ROW(CAST(NULL AS MAP(INTEGER, INTEGER)), CAST(NULL AS MAP(INTEGER, INTEGER)), CAST(NULL AS BIGINT))), + ROW(ROW(CAST(NULL AS MAP(INTEGER, INTEGER)), CAST(NULL AS MAP(INTEGER, INTEGER)), CAST(NULL AS BIGINT))) + """); + testPartitionsTableColumnTypes("ROW(row_integer_1 INTEGER, row_integer_2 INTEGER)", "VALUES (ROW(1,3), 'a'), (ROW(1,2), 'a'), (ROW(8,3), 'b'), (ROW(1,5), 'b')", 4, """ + VALUES + ROW(ROW(CAST(NULL AS ROW(row_integer_1 INTEGER, row_integer_2 INTEGER)), CAST(NULL AS ROW(row_integer_1 INTEGER, row_integer_2 INTEGER)), CAST(NULL AS BIGINT))), + ROW(ROW(CAST(NULL AS ROW(row_integer_1 INTEGER, row_integer_2 INTEGER)), CAST(NULL AS ROW(row_integer_1 INTEGER, row_integer_2 INTEGER)), CAST(NULL AS BIGINT))) + """); + } + + private void testPartitionsTableColumnTypes(String type, @Language("SQL") String insertIntoValues, int insertIntoValuesCount, @Language("SQL") String expectedDataColumn) + { + String tableName = "test_partitions_table_data_column_" + randomNameSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(_nonpartition " + type + ", _partition VARCHAR) WITH (partitioned_by = ARRAY['_partition'])"); + assertUpdate("INSERT INTO " + tableName + " " + insertIntoValues, insertIntoValuesCount); + assertThat(query("SELECT CAST(data._nonpartition AS ROW(" + type + "," + type + ", BIGINT)) FROM \"" + tableName + "$partitions\"")).matches(expectedDataColumn); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testPartitionsTableUnpartitioned() + { + String tableName = "test_partitions_table_unpartitioned_" + randomNameSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(_bigint BIGINT, _date DATE)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 6"); + assertQuery("SELECT count(*) FROM \"" + tableName + "$partitions\"", "VALUES 0"); + assertQueryReturnsEmptyResult("SELECT * FROM \"" + tableName + "$partitions\""); + + assertQuery("SHOW COLUMNS FROM \"" + tableName + "$partitions\"", + """ + VALUES + ('file_count', 'bigint', '', ''), + ('total_size', 'bigint', '', ''), + ('data', 'row(_bigint row(min bigint, max bigint, null_count bigint), _date row(min date, max date, null_count bigint))', '', '') + """); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableName.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableName.java index 2df36fb8a49a..024fa869da58 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableName.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeTableName.java @@ -19,6 +19,7 @@ import static io.trino.plugin.deltalake.DeltaLakeTableType.DATA; import static io.trino.plugin.deltalake.DeltaLakeTableType.HISTORY; +import static io.trino.plugin.deltalake.DeltaLakeTableType.PARTITIONS; import static io.trino.plugin.deltalake.DeltaLakeTableType.PROPERTIES; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; @@ -32,6 +33,7 @@ public void testParse() assertParseNameAndType("abc", "abc", DATA); assertParseNameAndType("abc$history", "abc", DeltaLakeTableType.HISTORY); assertParseNameAndType("abc$properties", "abc", DeltaLakeTableType.PROPERTIES); + assertParseNameAndType("abc$partitions", "abc", DeltaLakeTableType.PARTITIONS); assertNoValidTableType("abc$data"); assertInvalid("abc@123", "Invalid Delta Lake table name: abc@123"); @@ -48,6 +50,7 @@ public void testIsDataTable() assertThat(DeltaLakeTableName.isDataTable("abc$data")).isFalse(); // it's invalid assertThat(DeltaLakeTableName.isDataTable("abc$history")).isFalse(); + assertThat(DeltaLakeTableName.isDataTable("abc$partitions")).isFalse(); assertThat(DeltaLakeTableName.isDataTable("abc$invalid")).isFalse(); } @@ -58,6 +61,7 @@ public void testTableNameFrom() assertThat(DeltaLakeTableName.tableNameFrom("abc$data")).isEqualTo("abc"); assertThat(DeltaLakeTableName.tableNameFrom("abc$history")).isEqualTo("abc"); assertThat(DeltaLakeTableName.tableNameFrom("abc$properties")).isEqualTo("abc"); + assertThat(DeltaLakeTableName.tableNameFrom("abc$partitions")).isEqualTo("abc"); assertThat(DeltaLakeTableName.tableNameFrom("abc$invalid")).isEqualTo("abc"); } @@ -68,6 +72,7 @@ public void testTableTypeFrom() assertThat(DeltaLakeTableName.tableTypeFrom("abc$data")).isEqualTo(Optional.empty()); // it's invalid assertThat(DeltaLakeTableName.tableTypeFrom("abc$history")).isEqualTo(Optional.of(HISTORY)); assertThat(DeltaLakeTableName.tableTypeFrom("abc$properties")).isEqualTo(Optional.of(PROPERTIES)); + assertThat(DeltaLakeTableName.tableTypeFrom("abc$partitions")).isEqualTo(Optional.of(PARTITIONS)); assertThat(DeltaLakeTableName.tableTypeFrom("abc$invalid")).isEqualTo(Optional.empty()); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java index 1b7ac73454bf..723319275880 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java @@ -92,4 +92,66 @@ public void testTablePropertiesWithTableFeatures() dropDeltaTableWithRetry("default." + tableName); } } + + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testTablePartitionsWithDeletionVectors() + { + String tableName = "test_table_partitions_with_deletion_vectors_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT, c VARCHAR(10)) " + + "USING delta " + + "PARTITIONED BY (a, c) " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + + List expectedBeforeDelete = ImmutableList.of( + row("{\"a\":1,\"c\":\"varchar_1\"}", 3L, 1347L, "{\"b\":{\"min\":11,\"max\":19,\"null_count\":0}}"), + row("{\"a\":1,\"c\":\"varchar_2\"}", 1L, 449L, "{\"b\":{\"min\":13,\"max\":13,\"null_count\":0}}")); + + List expectedAfterFirstDelete = ImmutableList.of( + row("{\"a\":1,\"c\":\"varchar_1\"}", 2L, 898L, "{\"b\":{\"min\":11,\"max\":17,\"null_count\":0}}"), + row("{\"a\":1,\"c\":\"varchar_2\"}", 1L, 449L, "{\"b\":{\"min\":13,\"max\":13,\"null_count\":0}}")); + + List expectedAfterSecondDelete = ImmutableList.of( + row("{\"a\":1,\"c\":\"varchar_1\"}", 2L, 898L, "{\"b\":{\"min\":11,\"max\":17,\"null_count\":0}}")); + + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11, 'varchar_1'), (1, 19, 'varchar_1'), (1, 17, 'varchar_1'), (1, 13, 'varchar_2')"); + assertThat(onTrino().executeQuery(format("SELECT cast(partition as json), file_count, total_size, cast(data as json) FROM delta.default.\"%s$partitions\"", tableName))).containsOnly(expectedBeforeDelete); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE b = 19"); + assertThat(onTrino().executeQuery(format("SELECT cast(partition as json), file_count, total_size, cast(data as json) FROM delta.default.\"%s$partitions\"", tableName))).containsOnly(expectedAfterFirstDelete); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE c = 'varchar_2'"); + assertThat(onTrino().executeQuery(format("SELECT cast(partition as json), file_count, total_size, cast(data as json) FROM delta.default.\"%s$partitions\"", tableName))).containsOnly(expectedAfterSecondDelete); + } + finally { + onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testTablePartitionsWithNoColumnStats() + { + String tableName = "test_table_partitions_with_no_column_stats_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT, c VARCHAR(10)) " + + "USING delta " + + "PARTITIONED BY (a, c) " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' = 0)"); + + List expected = ImmutableList.of( + row("{\"a\":1,\"c\":\"varchar_1\"}", 3L, 1347L, "{\"b\":{\"min\":null,\"max\":null,\"null_count\":null}}"), + row("{\"a\":1,\"c\":\"varchar_2\"}", 1L, 449L, "{\"b\":{\"min\":null,\"max\":null,\"null_count\":null}}"), + row("{\"a\":1,\"c\":\"varchar_3\"}", 1L, 413L, "{\"b\":{\"min\":null,\"max\":null,\"null_count\":null}}")); + + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11, 'varchar_1'), (1, 19, 'varchar_1'), (1, 17, 'varchar_1'), (1, 13, 'varchar_2'), (1, NULL, 'varchar_3')"); + assertThat(onTrino().executeQuery(format("SELECT cast(partition as json), file_count, total_size, cast(data as json) FROM delta.default.\"%s$partitions\"", tableName))).containsOnly(expected); + } + finally { + onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + } + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java index 51d62b2e3745..08103b4d221e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestHiveAndDeltaLakeRedirect.java @@ -127,33 +127,15 @@ public void testHiveToDeltaRedirectWithDefaultSchemaInSession() } @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) - public void testHiveToUnpartitionedDeltaPartitionsRedirectFailure() + public void testHiveToUnpartitionedDeltaPartitionsRedirect() { String tableName = "test_delta_lake_unpartitioned_table_" + randomNameSuffix(); onDelta().executeQuery(createTableOnDelta(tableName, false)); try { - assertQueryFailure(() -> onTrino().executeQuery(format("SELECT * FROM hive.default.\"%s$partitions\"", tableName))) - .hasMessageMatching(".*Table 'hive.default.\"test_delta_lake_unpartitioned_table_.*\\$partitions\"' redirected to 'delta.default.\"test_delta_lake_unpartitioned_table_.*\\$partitions\"', " + - "but the target table 'delta.default.\"test_delta_lake_unpartitioned_table_.*\\$partitions\"' does not exist"); - } - finally { - dropDeltaTableWithRetry(tableName); - } - } - - @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) - public void testHiveToPartitionedDeltaPartitionsRedirectFailure() - { - String tableName = "test_delta_lake_partitioned_table_" + randomNameSuffix(); - - onDelta().executeQuery(createTableOnDelta(tableName, true)); - - try { - assertQueryFailure(() -> onTrino().executeQuery(format("SELECT * FROM hive.default.\"%s$partitions\"", tableName))) - .hasMessageMatching(".*Table 'hive.default.\"test_delta_lake_partitioned_table_.*\\$partitions\"' redirected to 'delta.default.\"test_delta_lake_partitioned_table_.*\\$partitions\"', " + - "but the target table 'delta.default.\"test_delta_lake_partitioned_table_.*\\$partitions\"' does not exist"); + assertThat(onTrino().executeQuery(format("SELECT * FROM hive.default.\"%s$partitions\"", tableName))) + .hasRowsCount(0); } finally { dropDeltaTableWithRetry(tableName);