diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ee84edff7ccc..663eac86325f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -848,6 +848,7 @@ jobs: - suite-delta-lake-databricks91 - suite-delta-lake-databricks104 - suite-delta-lake-databricks113 + - suite-delta-lake-databricks122 - suite-gcs - suite-clients - suite-functions @@ -896,6 +897,11 @@ jobs: - suite: suite-delta-lake-databricks113 ignore exclusion if: >- ${{ secrets.DATABRICKS_TOKEN != '' }} + - suite: suite-delta-lake-databricks122 + config: hdp3 + - suite: suite-delta-lake-databricks122 + ignore exclusion if: >- + ${{ secrets.DATABRICKS_TOKEN != '' }} ignore exclusion if: # Do not use this property outside of the matrix configuration. @@ -992,6 +998,7 @@ jobs: DATABRICKS_91_JDBC_URL: DATABRICKS_104_JDBC_URL: DATABRICKS_113_JDBC_URL: + DATABRICKS_122_JDBC_URL: DATABRICKS_LOGIN: DATABRICKS_TOKEN: GCP_CREDENTIALS_KEY: @@ -1062,6 +1069,7 @@ jobs: DATABRICKS_91_JDBC_URL: ${{ secrets.DATABRICKS_91_JDBC_URL }} DATABRICKS_104_JDBC_URL: ${{ secrets.DATABRICKS_104_JDBC_URL }} DATABRICKS_113_JDBC_URL: ${{ secrets.DATABRICKS_113_JDBC_URL }} + DATABRICKS_122_JDBC_URL: ${{ secrets.DATABRICKS_122_JDBC_URL }} DATABRICKS_LOGIN: token DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }} diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 85567cbc7090..bfa0d62a6667 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -16,7 +16,7 @@ Requirements To connect to Databricks Delta Lake, you need: -* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS and 11.3 LTS are supported. +* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS, 11.3 LTS and 12.2 LTS are supported. * Deployments using AWS, HDFS, Azure Storage, and Google Cloud Storage (GCS) are fully supported. * Network access from the coordinator and workers to the Delta Lake storage. @@ -298,8 +298,8 @@ No other types are supported. Security -------- -The Delta Lake connector allows you to choose one of several means of providing -autorization at the catalog level. You can select a different type of +The Delta Lake connector allows you to choose one of several means of providing +autorization at the catalog level. You can select a different type of authorization check in different Delta Lake catalog files. .. _delta-lake-authorization: @@ -935,4 +935,4 @@ connector. property to ``false`` to disable the optimized parquet reader by default for structural data types. The equivalent catalog session property is ``parquet_optimized_nested_reader_enabled``. - - ``true`` \ No newline at end of file + - ``true`` diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 36d410c31d33..5896a5a79534 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -207,6 +207,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedReaderFeatures; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.validateType; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.verifySupportedColumnMapping; import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY; @@ -283,8 +284,9 @@ public class DeltaLakeMetadata public static final int DEFAULT_READER_VERSION = 1; public static final int DEFAULT_WRITER_VERSION = 2; - // The highest reader and writer versions Trino supports writing to - public static final int MAX_WRITER_VERSION = 4; + // The highest reader and writer versions Trino supports + private static final int MAX_READER_VERSION = 3; + private static final int MAX_WRITER_VERSION = 4; private static final int CDF_SUPPORTED_WRITER_VERSION = 4; // Matches the dummy column Databricks stores in the metastore @@ -437,6 +439,16 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable } throw e; } + ProtocolEntry protocolEntry = metastore.getProtocol(session, tableSnapshot); + if (protocolEntry.getMinReaderVersion() > MAX_READER_VERSION) { + LOG.debug("Skip %s because the reader version is unsupported: %d", dataTableName, protocolEntry.getMinReaderVersion()); + return null; + } + Set unsupportedReaderFeatures = unsupportedReaderFeatures(protocolEntry.getReaderFeatures().orElse(ImmutableSet.of())); + if (!unsupportedReaderFeatures.isEmpty()) { + LOG.debug("Skip %s because the table contains unsupported reader features: %s", dataTableName, unsupportedReaderFeatures); + return null; + } verifySupportedColumnMapping(getColumnMappingMode(metadataEntry)); return new DeltaLakeTableHandle( dataTableName.getSchemaName(), @@ -1742,7 +1754,7 @@ private ProtocolEntry protocolEntryForNewTable(Map properties) // Enabling cdf (change data feed) requires setting the writer version to 4 writerVersion = CDF_SUPPORTED_WRITER_VERSION; } - return new ProtocolEntry(DEFAULT_READER_VERSION, writerVersion); + return new ProtocolEntry(DEFAULT_READER_VERSION, writerVersion, Optional.empty(), Optional.empty()); } private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional checkpointInterval, long newVersion) @@ -1886,7 +1898,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta Optional protocolEntry = Optional.empty(); if (requiredWriterVersion != currentProtocolEntry.getMinWriterVersion()) { - protocolEntry = Optional.of(new ProtocolEntry(currentProtocolEntry.getMinReaderVersion(), requiredWriterVersion)); + protocolEntry = Optional.of(new ProtocolEntry(currentProtocolEntry.getMinReaderVersion(), requiredWriterVersion, currentProtocolEntry.getReaderFeatures(), currentProtocolEntry.getWriterFeatures())); } try { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index fb4f5f41141e..01fb6004f330 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -21,6 +21,8 @@ import com.google.common.base.Enums; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import io.airlift.json.ObjectMapperProvider; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; @@ -49,6 +51,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.Set; import java.util.function.Function; import static com.google.common.base.Strings.isNullOrEmpty; @@ -81,6 +84,12 @@ private DeltaLakeSchemaSupport() {} public static final String APPEND_ONLY_CONFIGURATION_KEY = "delta.appendOnly"; public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode"; + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features + // TODO: Add support for 'deletionVectors' and 'timestampNTZ' reader features + private static final Set SUPPORTED_READER_FEATURES = ImmutableSet.builder() + .add("columnMapping") + .build(); + public enum ColumnMappingMode { ID, @@ -467,6 +476,11 @@ private static Map getColumnProperty(String json, Function unsupportedReaderFeatures(Set features) + { + return Sets.difference(features, SUPPORTED_READER_FEATURES); + } + private static Type buildType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName) { if (typeNode.isContainerNode()) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java index 8bb39aa3ac19..47d1dfb47a2d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java @@ -17,21 +17,39 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Objects; +import java.util.Optional; +import java.util.Set; import static java.lang.String.format; public class ProtocolEntry { + private static final int MIN_VERSION_SUPPORTS_READER_FEATURES = 3; + private static final int MIN_VERSION_SUPPORTS_WRITER_FEATURES = 7; + private final int minReaderVersion; private final int minWriterVersion; + private final Optional> readerFeatures; + private final Optional> writerFeatures; @JsonCreator public ProtocolEntry( @JsonProperty("minReaderVersion") int minReaderVersion, - @JsonProperty("minWriterVersion") int minWriterVersion) + @JsonProperty("minWriterVersion") int minWriterVersion, + // The delta protocol documentation mentions that readerFeatures & writerFeatures is Array[String], but their actual implementation is Set + @JsonProperty("readerFeatures") Optional> readerFeatures, + @JsonProperty("writerFeatures") Optional> writerFeatures) { this.minReaderVersion = minReaderVersion; this.minWriterVersion = minWriterVersion; + if (minReaderVersion < MIN_VERSION_SUPPORTS_READER_FEATURES && readerFeatures.isPresent()) { + throw new IllegalArgumentException("readerFeatures must not exist when minReaderVersion is less than " + MIN_VERSION_SUPPORTS_READER_FEATURES); + } + if (minWriterVersion < MIN_VERSION_SUPPORTS_WRITER_FEATURES && writerFeatures.isPresent()) { + throw new IllegalArgumentException("writerFeatures must not exist when minWriterVersion is less than " + MIN_VERSION_SUPPORTS_WRITER_FEATURES); + } + this.readerFeatures = readerFeatures; + this.writerFeatures = writerFeatures; } @JsonProperty @@ -46,6 +64,18 @@ public int getMinWriterVersion() return minWriterVersion; } + @JsonProperty + public Optional> getReaderFeatures() + { + return readerFeatures; + } + + @JsonProperty + public Optional> getWriterFeatures() + { + return writerFeatures; + } + @Override public boolean equals(Object o) { @@ -57,18 +87,25 @@ public boolean equals(Object o) } ProtocolEntry that = (ProtocolEntry) o; return minReaderVersion == that.minReaderVersion && - minWriterVersion == that.minWriterVersion; + minWriterVersion == that.minWriterVersion && + readerFeatures.equals(that.readerFeatures) && + writerFeatures.equals(that.writerFeatures); } @Override public int hashCode() { - return Objects.hash(minReaderVersion, minWriterVersion); + return Objects.hash(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures); } @Override public String toString() { - return format("ProtocolEntry{minReaderVersion=%d, minWriterVersion=%d}", minReaderVersion, minWriterVersion); + return format( + "ProtocolEntry{minReaderVersion=%d, minWriterVersion=%d, readerFeatures=%s, writerFeatures=%s}", + minReaderVersion, + minWriterVersion, + readerFeatures, + writerFeatures); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index dd256ab281f9..24a698408823 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -67,6 +67,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; @@ -213,7 +214,7 @@ private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointS type = schemaManager.getMetadataEntryType(); break; case PROTOCOL: - type = schemaManager.getProtocolEntryType(); + type = schemaManager.getProtocolEntryType(true, true); break; case COMMIT: type = schemaManager.getCommitInfoEntryType(); @@ -278,16 +279,21 @@ private DeltaLakeTransactionLogEntry buildProtocolEntry(ConnectorSession session if (block.isNull(pagePosition)) { return null; } - int protocolFields = 2; + int minProtocolFields = 2; + int maxProtocolFields = 4; Block protocolEntryBlock = block.getObject(pagePosition, Block.class); log.debug("Block %s has %s fields", block, protocolEntryBlock.getPositionCount()); - if (protocolEntryBlock.getPositionCount() != protocolFields) { + if (protocolEntryBlock.getPositionCount() < minProtocolFields || protocolEntryBlock.getPositionCount() > maxProtocolFields) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, - format("Expected block %s to have %d children, but found %s", block, protocolFields, protocolEntryBlock.getPositionCount())); + format("Expected block %s to have between %d and %d children, but found %s", block, minProtocolFields, maxProtocolFields, protocolEntryBlock.getPositionCount())); } + // The last entry should be writer feature when protocol entry size is 3 https://github.com/delta-io/delta/blob/master/PROTOCOL.md#disabled-features + int position = 0; ProtocolEntry result = new ProtocolEntry( - getInt(protocolEntryBlock, 0), - getInt(protocolEntryBlock, 1)); + getInt(protocolEntryBlock, position++), + getInt(protocolEntryBlock, position++), + protocolEntryBlock.getPositionCount() == 4 && protocolEntryBlock.isNull(position) ? Optional.empty() : Optional.of(getList(protocolEntryBlock, position++).stream().collect(toImmutableSet())), + protocolEntryBlock.isNull(position) ? Optional.empty() : Optional.of(getList(protocolEntryBlock, position++).stream().collect(toImmutableSet()))); log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.protocolEntry(result); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java index 2a29d5dde06d..d98518c15cb6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java @@ -53,19 +53,16 @@ public class CheckpointSchemaManager RowType.field("deletionTimestamp", BigintType.BIGINT), RowType.field("dataChange", BooleanType.BOOLEAN))); - private static final RowType PROTOCOL_ENTRY_TYPE = RowType.from(ImmutableList.of( - RowType.field("minReaderVersion", IntegerType.INTEGER), - RowType.field("minWriterVersion", IntegerType.INTEGER))); - private final RowType metadataEntryType; private final RowType commitInfoEntryType; + private final ArrayType stringList; @Inject public CheckpointSchemaManager(TypeManager typeManager) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); - ArrayType stringList = (ArrayType) this.typeManager.getType(TypeSignature.arrayType(VarcharType.VARCHAR.getTypeSignature())); + stringList = (ArrayType) this.typeManager.getType(TypeSignature.arrayType(VarcharType.VARCHAR.getTypeSignature())); MapType stringMap = (MapType) this.typeManager.getType(TypeSignature.mapType(VarcharType.VARCHAR.getTypeSignature(), VarcharType.VARCHAR.getTypeSignature())); metadataEntryType = RowType.from(ImmutableList.of( @@ -176,9 +173,18 @@ public RowType getTxnEntryType() return TXN_ENTRY_TYPE; } - public RowType getProtocolEntryType() + public RowType getProtocolEntryType(boolean requireReaderFeatures, boolean requireWriterFeatures) { - return PROTOCOL_ENTRY_TYPE; + ImmutableList.Builder fields = ImmutableList.builder(); + fields.add(RowType.field("minReaderVersion", IntegerType.INTEGER)); + fields.add(RowType.field("minWriterVersion", IntegerType.INTEGER)); + if (requireReaderFeatures) { + fields.add(RowType.field("readerFeatures", stringList)); + } + if (requireWriterFeatures) { + fields.add(RowType.field("writerFeatures", stringList)); + } + return RowType.from(fields.build()); } public RowType getCommitInfoEntryType() diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index dd2396ba3884..376b6f076313 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -50,6 +50,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue; @@ -96,8 +97,10 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile) // The default value is false in https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-format, but Databricks defaults to true boolean writeStatsAsStruct = Boolean.parseBoolean(configuration.getOrDefault(DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY, "true")); + ProtocolEntry protocolEntry = entries.getProtocolEntry(); + RowType metadataEntryType = checkpointSchemaManager.getMetadataEntryType(); - RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(); + RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.getReaderFeatures().isPresent(), protocolEntry.getWriterFeatures().isPresent()); RowType txnEntryType = checkpointSchemaManager.getTxnEntryType(); RowType addEntryType = checkpointSchemaManager.getAddEntryType(entries.getMetadataEntry(), writeStatsAsJson, writeStatsAsStruct); RowType removeEntryType = checkpointSchemaManager.getRemoveEntryType(); @@ -177,8 +180,15 @@ private void writeProtocolEntry(PageBuilder pageBuilder, RowType entryType, Prot pageBuilder.declarePosition(); BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(PROTOCOL_BLOCK_CHANNEL); BlockBuilder entryBlockBuilder = blockBuilder.beginBlockEntry(); - writeLong(entryBlockBuilder, entryType, 0, "minReaderVersion", (long) protocolEntry.getMinReaderVersion()); - writeLong(entryBlockBuilder, entryType, 1, "minWriterVersion", (long) protocolEntry.getMinWriterVersion()); + int fieldId = 0; + writeLong(entryBlockBuilder, entryType, fieldId++, "minReaderVersion", (long) protocolEntry.getMinReaderVersion()); + writeLong(entryBlockBuilder, entryType, fieldId++, "minWriterVersion", (long) protocolEntry.getMinWriterVersion()); + if (protocolEntry.getReaderFeatures().isPresent()) { + writeStringList(entryBlockBuilder, entryType, fieldId++, "readerFeatures", protocolEntry.getReaderFeatures().get().stream().collect(toImmutableList())); + } + if (protocolEntry.getWriterFeatures().isPresent()) { + writeStringList(entryBlockBuilder, entryType, fieldId++, "writerFeatures", protocolEntry.getWriterFeatures().get().stream().collect(toImmutableList())); + } blockBuilder.closeEntry(); // null for others diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index 6aa43d6ab996..8b2a44cd0603 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -162,7 +162,7 @@ private static ConnectorPageSink createPageSink(String outputPath, DeltaLakeWrit true, Optional.empty(), Optional.of(false), - new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION)); + new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, Optional.empty(), Optional.empty())); DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider( new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()), diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestProtocolEntry.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestProtocolEntry.java new file mode 100644 index 000000000000..eaa114aaea36 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestProtocolEntry.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import com.google.common.collect.ImmutableSet; +import io.airlift.json.JsonCodec; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +public class TestProtocolEntry +{ + private final JsonCodec codec = JsonCodec.jsonCodec(ProtocolEntry.class); + + @Test + public void testProtocolEntryFromJson() + { + @Language("JSON") + String json = "{\"minReaderVersion\":2,\"minWriterVersion\":5}"; + assertEquals( + codec.fromJson(json), + new ProtocolEntry(2, 5, Optional.empty(), Optional.empty())); + + @Language("JSON") + String jsonWithFeatures = "{\"minReaderVersion\":3,\"minWriterVersion\":7,\"readerFeatures\":[\"deletionVectors\"],\"writerFeatures\":[\"timestampNTZ\"]}"; + assertEquals( + codec.fromJson(jsonWithFeatures), + new ProtocolEntry(3, 7, Optional.of(ImmutableSet.of("deletionVectors")), Optional.of(ImmutableSet.of("timestampNTZ")))); + } + + @Test + public void testInvalidProtocolEntryFromJson() + { + @Language("JSON") + String invalidMinReaderVersion = "{\"minReaderVersion\":2,\"minWriterVersion\":7,\"readerFeatures\":[\"deletionVectors\"]}"; + assertThatThrownBy(() -> codec.fromJson(invalidMinReaderVersion)) + .hasMessageContaining("Invalid JSON string") + .hasStackTraceContaining("readerFeatures must not exist when minReaderVersion is less than 3"); + + @Language("JSON") + String invalidMinWriterVersion = "{\"minReaderVersion\":3,\"minWriterVersion\":6,\"writerFeatures\":[\"timestampNTZ\"]}"; + assertThatThrownBy(() -> codec.fromJson(invalidMinWriterVersion)) + .hasMessageContaining("Invalid JSON string") + .hasStackTraceContaining("writerFeatures must not exist when minWriterVersion is less than 7"); + } + + @Test + public void testProtocolEntryToJson() + { + assertEquals( + codec.toJson(new ProtocolEntry(2, 5, Optional.empty(), Optional.empty())), + """ + { + "minReaderVersion" : 2, + "minWriterVersion" : 5 + }"""); + + assertEquals( + codec.toJson(new ProtocolEntry(3, 7, Optional.of(ImmutableSet.of("deletionVectors")), Optional.of(ImmutableSet.of("timestampNTZ")))), + """ + { + "minReaderVersion" : 3, + "minWriterVersion" : 7, + "readerFeatures" : [ "deletionVectors" ], + "writerFeatures" : [ "timestampNTZ" ] + }"""); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index 50e124310a8d..e946b503da90 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -169,7 +169,7 @@ public void readsCheckpointFile() Optional.empty(), null)); - assertThat(entries).element(6).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2)); + assertThat(entries).element(6).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2, Optional.empty(), Optional.empty())); assertThat(entries).element(8).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo( new AddFileEntry( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java index dc36b246e8ea..f19248c7fbb2 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java @@ -44,8 +44,8 @@ public void testCheckpointBuilder() builder.addLogEntry(metadataEntry(metadata1)); builder.addLogEntry(metadataEntry(metadata2)); - ProtocolEntry protocol1 = new ProtocolEntry(1, 2); - ProtocolEntry protocol2 = new ProtocolEntry(3, 4); + ProtocolEntry protocol1 = new ProtocolEntry(1, 2, Optional.empty(), Optional.empty()); + ProtocolEntry protocol2 = new ProtocolEntry(3, 4, Optional.empty(), Optional.empty()); builder.addLogEntry(protocolEntry(protocol1)); builder.addLogEntry(protocolEntry(protocol2)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index cfb725ac86f8..a9260e3a5a87 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -164,7 +164,7 @@ public void testReadAllEntries() assertThat(entries).element(12).extracting(DeltaLakeTransactionLogEntry::getMetaData).isEqualTo(metadataEntry); // ProtocolEntry - assertThat(entries).element(11).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2)); + assertThat(entries).element(11).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2, Optional.empty(), Optional.empty())); // TransactionEntry // not found in the checkpoint, TODO add a test diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index 0f63587b1a21..18dde1331b29 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -120,7 +120,7 @@ public void testCheckpointWriteReadJsonRoundtrip() "configOption1", "blah", "configOption2", "plah"), 1000); - ProtocolEntry protocolEntry = new ProtocolEntry(10, 20); + ProtocolEntry protocolEntry = new ProtocolEntry(10, 20, Optional.empty(), Optional.empty()); TransactionEntry transactionEntry = new TransactionEntry("appId", 1, 1001); AddFileEntry addFileEntryJsonStats = new AddFileEntry( "addFilePathJson", @@ -246,7 +246,7 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() "configOption1", "blah", "configOption2", "plah"), 1000); - ProtocolEntry protocolEntry = new ProtocolEntry(10, 20); + ProtocolEntry protocolEntry = new ProtocolEntry(10, 20, Optional.empty(), Optional.empty()); TransactionEntry transactionEntry = new TransactionEntry("appId", 1, 1001); Block[] minMaxRowFieldBlocks = new Block[]{ @@ -365,7 +365,7 @@ public void testDisablingRowStatistics() ImmutableList.of(), ImmutableMap.of(), 1000); - ProtocolEntry protocolEntry = new ProtocolEntry(10, 20); + ProtocolEntry protocolEntry = new ProtocolEntry(10, 20, Optional.empty(), Optional.empty()); Block[] minMaxRowFieldBlocks = new Block[]{ nativeValueToBlock(IntegerType.INTEGER, 1L), nativeValueToBlock(createUnboundedVarcharType(), utf8Slice("a")) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeDeltaLakeDatabricks122.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeDeltaLakeDatabricks122.java new file mode 100644 index 000000000000..0435b226bfb1 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeDeltaLakeDatabricks122.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.inject.Inject; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import static java.util.Objects.requireNonNull; + +@TestsEnvironment +public class EnvSinglenodeDeltaLakeDatabricks122 + extends AbstractSinglenodeDeltaLakeDatabricks +{ + @Inject + public EnvSinglenodeDeltaLakeDatabricks122(Standard standard, DockerFiles dockerFiles) + { + super(standard, dockerFiles); + } + + @Override + String databricksTestJdbcUrl() + { + return requireNonNull(System.getenv("DATABRICKS_122_JDBC_URL"), "Environment DATABRICKS_122_JDBC_URL was not set"); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks104.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks104.java index ca14a819bdd5..db5d73f287d0 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks104.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks104.java @@ -32,6 +32,7 @@ public List getTestRuns(EnvironmentConfig config) return ImmutableList.of( testOnEnvironment(EnvSinglenodeDeltaLakeDatabricks104.class) .withGroups("configured_features", "delta-lake-databricks") + .withExcludedGroups("delta-lake-exclude-104") .withExcludedTests(getExcludedTests()) .build()); } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks122.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks122.java new file mode 100644 index 000000000000..8fab396951f8 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks122.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.suite.suites; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeDeltaLakeDatabricks122; +import io.trino.tests.product.launcher.suite.SuiteDeltaLakeDatabricks; +import io.trino.tests.product.launcher.suite.SuiteTestRun; + +import java.util.List; + +import static io.trino.tests.product.launcher.suite.SuiteTestRun.testOnEnvironment; + +public class SuiteDeltaLakeDatabricks122 + extends SuiteDeltaLakeDatabricks +{ + @Override + public List getTestRuns(EnvironmentConfig config) + { + return ImmutableList.of( + testOnEnvironment(EnvSinglenodeDeltaLakeDatabricks122.class) + .withGroups("configured_features", "delta-lake-databricks") + .withExcludedGroups("delta-lake-exclude-122") + .withExcludedTests(getExcludedTests()) + .build()); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 21a512e2f3e3..9ef635ebaf83 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -86,7 +86,9 @@ public final class TestGroups public static final String DELTA_LAKE_DATABRICKS = "delta-lake-databricks"; public static final String DELTA_LAKE_EXCLUDE_73 = "delta-lake-exclude-73"; public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91"; + public static final String DELTA_LAKE_EXCLUDE_104 = "delta-lake-exclude-104"; public static final String DELTA_LAKE_EXCLUDE_113 = "delta-lake-exclude-113"; + public static final String DELTA_LAKE_EXCLUDE_122 = "delta-lake-exclude-122"; public static final String HUDI = "hudi"; public static final String PARQUET = "parquet"; public static final String IGNITE = "ignite"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index ebca92e1b7dc..a39b6b77e273 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -13,9 +13,9 @@ */ package io.trino.tests.product.deltalake; -import com.google.common.collect.ImmutableList; import io.trino.tempto.assertions.QueryAssert; import io.trino.testng.services.Flaky; +import io.trino.tests.product.deltalake.util.DatabricksVersion; import org.assertj.core.api.Assertions; import org.testng.annotations.Test; @@ -31,6 +31,7 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_122_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; @@ -418,7 +419,9 @@ public void testIdentityColumn() .contains("b BIGINT GENERATED ALWAYS AS IDENTITY"); onDelta().executeQuery("INSERT INTO default." + tableName + " (a) VALUES (0)"); - List expected = ImmutableList.of(row(0, 1)); + DatabricksVersion databricksRuntimeVersion = getDatabricksRuntimeVersion().orElseThrow(); + // Actual value for IDENTITY column varies between Databricks versions + QueryAssert.Row expected = databricksRuntimeVersion.isOlderThan(DATABRICKS_122_RUNTIME_VERSION) ? row(0, 1) : row(0, 2); assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected); assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java index 30e8d719a978..4cf39fe28579 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java @@ -262,7 +262,7 @@ public void testDatabricksUsesCheckpointInterval() private String getDatabricksTablePropertiesWithCheckpointInterval() { - if (databricksRuntimeVersion.equals(DATABRICKS_113_RUNTIME_VERSION)) { + if (databricksRuntimeVersion.isAtLeast(DATABRICKS_113_RUNTIME_VERSION)) { return "TBLPROPERTIES (\n" + " 'delta.checkpointInterval' = '3',\n" + " 'delta.minReaderVersion' = '1',\n" + diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java index e1b1ee36d855..f32d46f0d481 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java @@ -33,6 +33,7 @@ import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_122; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertLastEntryIsCheckpointed; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion; @@ -222,8 +223,8 @@ public void testReplaceTableWithSchemaChange() } } - // Databricks 11.3 doesn't create a checkpoint file at 'CREATE OR REPLACE TABLE' statement - @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + // Databricks 11.3 and 12.2 don't create a checkpoint file at 'CREATE OR REPLACE TABLE' statement + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_113, DELTA_LAKE_EXCLUDE_122, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testReplaceTableWithSchemaChangeOnCheckpoint() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java index cfd1b3356739..580033e03525 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java @@ -315,7 +315,7 @@ public void testCreateTableWithColumnCommentOnDelta() private String getDatabricksDefaultTableProperties() { - if (databricksRuntimeVersion.equals(DATABRICKS_113_RUNTIME_VERSION)) { + if (databricksRuntimeVersion.isAtLeast(DATABRICKS_113_RUNTIME_VERSION)) { return "TBLPROPERTIES (\n" + " 'delta.minReaderVersion' = '1',\n" + " 'delta.minWriterVersion' = '2')\n"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java index b4b545c5f239..fedc2b2d88b6 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java @@ -21,10 +21,15 @@ import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -53,4 +58,52 @@ public void testDeleteOnAppendOnlyTableFails() .containsOnly(row(1, 11), row(2, 12)); onTrino().executeQuery("DROP TABLE " + tableName); } + + // Databricks 12.1 added support for deletion vectors + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectors() + { + // TODO https://github.com/trinodb/trino/issues/16903 Add support for deletionVectors reader features + String tableName = "test_deletion_vectors_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a INT, b INT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + " TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2, 22)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11)); + + assertThat(onTrino().executeQuery("SHOW TABLES FROM delta.default")) + .contains(row(tableName)); + assertThat(onTrino().executeQuery("SELECT comment FROM information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'")) + .hasNoRows(); + assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .hasMessageMatching(".* Table .* does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default.\"" + tableName + "$history\"")) + .hasMessageMatching(".* Table .* does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName)) + .hasMessageMatching(".* Table .* does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("DESCRIBE delta.default." + tableName)) + .hasMessageMatching(".* Table .* does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)")) + .hasMessageMatching(".* Table .* does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) + .hasMessageMatching(".* Table .* does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3")) + .hasMessageMatching(".* Table .* does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1")) + .hasMessageMatching(".* Table .* does not exist"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeProceduresCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeProceduresCompatibility.java index a5896ddafaee..b24cee7d0833 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeProceduresCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeProceduresCompatibility.java @@ -52,7 +52,7 @@ public void testUnregisterTable() assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) .hasMessageMatching(".* Table '.*' does not exist"); assertQueryFailure(() -> onDelta().executeQuery("SELECT * FROM default." + tableName)) - .hasMessageMatching("(?s).* Table or view not found: .*"); + .hasMessageMatching("(?s).*(Table or view not found|The table or view .* cannot be found).*"); } finally { onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + tableName); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java index d4017e295705..478cfa2ee40c 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java @@ -14,11 +14,13 @@ package io.trino.tests.product.deltalake; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.inject.Inject; import com.google.inject.name.Named; import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.assertions.QueryAssert; import io.trino.testng.services.Flaky; +import io.trino.tests.product.deltalake.util.DatabricksVersion; import org.assertj.core.api.Assertions; import org.assertj.core.api.SoftAssertions; import org.testng.annotations.DataProvider; @@ -35,9 +37,11 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_122_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -367,7 +371,16 @@ private void testVacuumRemoveChangeDataFeedFiles(Consumer vacuumExecutor // https://docs.delta.io/2.1.0/delta-change-data-feed.html#change-data-storage vacuumExecutor.accept(tableName); - Assertions.assertThat(s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries()).hasSize(0); + List summaries = s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries(); + Assertions.assertThat(summaries).hasSizeBetween(0, 1); + if (!summaries.isEmpty()) { + // Databricks version >= 12.2 keep an empty _change_data directory + DatabricksVersion databricksRuntimeVersion = getDatabricksRuntimeVersion().orElseThrow(); + Assertions.assertThat(databricksRuntimeVersion.isAtLeast(DATABRICKS_122_RUNTIME_VERSION)).isTrue(); + S3ObjectSummary summary = summaries.get(0); + Assertions.assertThat(summary.getKey()).endsWith(changeDataPrefix + "/"); + Assertions.assertThat(summary.getSize()).isEqualTo(0); + } } finally { dropDeltaTableWithRetry("default." + tableName); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DatabricksVersion.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DatabricksVersion.java index bcf548ab7143..52063a7fb479 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DatabricksVersion.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DatabricksVersion.java @@ -21,6 +21,7 @@ public record DatabricksVersion(int majorVersion, int minorVersion) implements Comparable { + public static final DatabricksVersion DATABRICKS_122_RUNTIME_VERSION = new DatabricksVersion(12, 2); public static final DatabricksVersion DATABRICKS_113_RUNTIME_VERSION = new DatabricksVersion(11, 3); public static final DatabricksVersion DATABRICKS_104_RUNTIME_VERSION = new DatabricksVersion(10, 4); public static final DatabricksVersion DATABRICKS_91_RUNTIME_VERSION = new DatabricksVersion(9, 1);