diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index b490f2323d8c..1756c8f3cb76 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -157,16 +157,6 @@ values. Typical usage does not require you to configure them. * - ``delta.register-table-procedure.enabled`` - Enable to allow users to call the ``register_table`` procedure - ``false`` - * - ``delta.default-reader-version`` - - The default reader version used by new tables. - The value can be overridden for a specific table with the - ``reader_version`` table property. - - ``1`` - * - ``delta.default-writer-version`` - - The default writer version used by new tables. - The value can be overridden for a specific table with the - ``writer_version`` table property. - - ``2`` The following table describes performance tuning catalog properties for the connector. @@ -573,21 +563,15 @@ The following properties are available for use: - Set the checkpoint interval in seconds. * - ``change_data_feed_enabled`` - Enables storing change data feed entries. - * - ``reader_version`` - - Set reader version. - * - ``writer_version`` - - Set writer version. -The following example uses all six table properties:: +The following example uses all available table properties:: CREATE TABLE example.default.example_partitioned_table WITH ( location = 's3://my-bucket/a/path', partitioned_by = ARRAY['regionkey'], checkpoint_interval = 5, - change_data_feed_enabled = true, - reader_version = 2, - writer_version = 4 + change_data_feed_enabled = true ) AS SELECT name, comment, regionkey FROM tpch.tiny.nation; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 2ebecc975eb6..352b18f6c684 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -25,7 +25,6 @@ import javax.validation.constraints.DecimalMax; import javax.validation.constraints.DecimalMin; -import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -49,12 +48,6 @@ public class DeltaLakeConfig @VisibleForTesting static final DataSize DEFAULT_DATA_FILE_CACHE_SIZE = DataSize.succinctBytes(Math.floorDiv(Runtime.getRuntime().maxMemory(), 10L)); - public static final int MIN_READER_VERSION = 1; - public static final int MIN_WRITER_VERSION = 2; - // The highest reader and writer versions Trino supports writing to - public static final int MAX_READER_VERSION = 2; - public static final int MAX_WRITER_VERSION = 4; - private Duration metadataCacheTtl = new Duration(5, TimeUnit.MINUTES); private long metadataCacheMaxSize = 1000; private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE; @@ -84,8 +77,6 @@ public class DeltaLakeConfig private boolean uniqueTableLocation = true; private boolean legacyCreateTableWithExistingLocationEnabled; private boolean registerTableProcedureEnabled; - private int defaultReaderVersion = MIN_READER_VERSION; - private int defaultWriterVersion = MIN_WRITER_VERSION; public Duration getMetadataCacheTtl() { @@ -484,34 +475,4 @@ public DeltaLakeConfig setRegisterTableProcedureEnabled(boolean registerTablePro this.registerTableProcedureEnabled = registerTableProcedureEnabled; return this; } - - @Min(value = MIN_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION) - @Max(value = MAX_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION) - public int getDefaultReaderVersion() - { - return defaultReaderVersion; - } - - @Config("delta.default-reader-version") - @ConfigDescription("The default reader version used by new tables") - public DeltaLakeConfig setDefaultReaderVersion(int defaultReaderVersion) - { - this.defaultReaderVersion = defaultReaderVersion; - return this; - } - - @Min(value = MIN_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION) - @Max(value = MAX_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION) - public int getDefaultWriterVersion() - { - return defaultWriterVersion; - } - - @Config("delta.default-writer-version") - @ConfigDescription("The default writer version used by new tables") - public DeltaLakeConfig setDefaultWriterVersion(int defaultWriterVersion) - { - this.defaultWriterVersion = defaultWriterVersion; - return this; - } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 65922784e667..33fbbdd3903c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -159,6 +159,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.MoreCollectors.toOptional; import static com.google.common.collect.Sets.difference; +import static com.google.common.primitives.Ints.max; import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getColumnNames; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getFilesModifiedAfterProperty; @@ -171,7 +172,6 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName; @@ -183,14 +183,10 @@ import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.PARTITIONED_BY_PROPERTY; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.READER_VERSION_PROPERTY; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.WRITER_VERSION_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getChangeDataFeedEnabled; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getCheckpointInterval; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getLocation; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getPartitionedBy; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getReaderVersion; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getWriterVersion; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE; @@ -210,6 +206,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.validateType; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.verifySupportedColumnMapping; +import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY; import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; @@ -287,6 +284,10 @@ public class DeltaLakeMetadata public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN"; public static final String ISOLATION_LEVEL = "WriteSerializable"; + public static final int DEFAULT_READER_VERSION = 1; + public static final int DEFAULT_WRITER_VERSION = 2; + // The highest reader and writer versions Trino supports writing to + public static final int MAX_WRITER_VERSION = 4; private static final int CDF_SUPPORTED_WRITER_VERSION = 4; // Matches the dummy column Databricks stores in the metastore @@ -297,7 +298,7 @@ public class DeltaLakeMetadata .add(NUMBER_OF_DISTINCT_VALUES_SUMMARY) .build(); private static final String ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY = "delta.enable-non-concurrent-writes"; - public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(READER_VERSION_PROPERTY, WRITER_VERSION_PROPERTY); + public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(CHANGE_DATA_FEED_ENABLED_PROPERTY); private final DeltaLakeMetastore metastore; private final TrinoFileSystemFactory fileSystemFactory; @@ -320,8 +321,6 @@ public class DeltaLakeMetadata private final boolean deleteSchemaLocationsFallback; private final boolean useUniqueTableLocation; private final boolean allowManagedTableRename; - private final int defaultReaderVersion; - private final int defaultWriterVersion; public DeltaLakeMetadata( DeltaLakeMetastore metastore, @@ -342,9 +341,7 @@ public DeltaLakeMetadata( DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider, ExtendedStatisticsAccess statisticsAccess, boolean useUniqueTableLocation, - boolean allowManagedTableRename, - int defaultReaderVersion, - int defaultWriterVersion) + boolean allowManagedTableRename) { this.metastore = requireNonNull(metastore, "metastore is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); @@ -366,8 +363,6 @@ public DeltaLakeMetadata( this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; this.useUniqueTableLocation = useUniqueTableLocation; this.allowManagedTableRename = allowManagedTableRename; - this.defaultReaderVersion = defaultReaderVersion; - this.defaultWriterVersion = defaultWriterVersion; } @Override @@ -488,10 +483,6 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect Optional changeDataFeedEnabled = tableHandle.getMetadataEntry().isChangeDataFeedEnabled(); changeDataFeedEnabled.ifPresent(value -> properties.put(CHANGE_DATA_FEED_ENABLED_PROPERTY, value)); - ProtocolEntry protocolEntry = getProtocolEntry(session, tableHandle.getSchemaTableName()); - properties.put(READER_VERSION_PROPERTY, protocolEntry.getMinReaderVersion()); - properties.put(WRITER_VERSION_PROPERTY, protocolEntry.getMinWriterVersion()); - return new ConnectorTableMetadata( tableHandle.getSchemaTableName(), columns, @@ -758,7 +749,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe CREATE_TABLE_OPERATION, session, tableMetadata.getComment(), - getProtocolEntry(tableMetadata.getProperties())); + protocolEntryForNewTable(tableMetadata.getProperties())); setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory)); transactionLogWriter.flush(); @@ -883,7 +874,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con external, tableMetadata.getComment(), getChangeDataFeedEnabled(tableMetadata.getProperties()), - getProtocolEntry(tableMetadata.getProperties())); + protocolEntryForNewTable(tableMetadata.getProperties())); } private Optional getSchemaLocation(Database database) @@ -1756,28 +1747,15 @@ private ProtocolEntry getProtocolEntry(ConnectorSession session, SchemaTableName return metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)); } - private ProtocolEntry getProtocolEntry(Map properties) + private ProtocolEntry protocolEntryForNewTable(Map properties) { - Optional readerVersion = getReaderVersion(properties); - Optional writerVersion = getWriterVersion(properties); + int writerVersion = DEFAULT_WRITER_VERSION; Optional changeDataFeedEnabled = getChangeDataFeedEnabled(properties); - if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) { - if (writerVersion.isPresent()) { - if (writerVersion.get() < CDF_SUPPORTED_WRITER_VERSION) { - throw new TrinoException( - INVALID_TABLE_PROPERTY, - WRITER_VERSION_PROPERTY + " cannot be set less than " + CDF_SUPPORTED_WRITER_VERSION + " when cdf is enabled"); - } - } - else { - // Enabling cdf (change data feed) requires setting the writer version to 4 - writerVersion = Optional.of(CDF_SUPPORTED_WRITER_VERSION); - } + // Enabling cdf (change data feed) requires setting the writer version to 4 + writerVersion = CDF_SUPPORTED_WRITER_VERSION; } - return new ProtocolEntry( - readerVersion.orElse(defaultReaderVersion), - writerVersion.orElse(defaultWriterVersion)); + return new ProtocolEntry(DEFAULT_READER_VERSION, writerVersion); } private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional checkpointInterval, long newVersion) @@ -1889,39 +1867,35 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle.getSchemaTableName()); - Optional readerVersion = Optional.empty(); - if (properties.containsKey(READER_VERSION_PROPERTY)) { - readerVersion = Optional.of((int) properties.get(READER_VERSION_PROPERTY) - .orElseThrow(() -> new IllegalArgumentException("The " + READER_VERSION_PROPERTY + " property cannot be empty"))); - if (readerVersion.get() < currentProtocolEntry.getMinReaderVersion()) { - throw new TrinoException(INVALID_TABLE_PROPERTY, format( - "%s cannot be downgraded from %d to %d", READER_VERSION_PROPERTY, currentProtocolEntry.getMinReaderVersion(), readerVersion.get())); - } - } + long createdTime = Instant.now().toEpochMilli(); - Optional writerVersion = Optional.empty(); - if (properties.containsKey(WRITER_VERSION_PROPERTY)) { - writerVersion = Optional.of((int) properties.get(WRITER_VERSION_PROPERTY) - .orElseThrow(() -> new IllegalArgumentException("The " + WRITER_VERSION_PROPERTY + " property cannot be empty"))); - if (writerVersion.get() < currentProtocolEntry.getMinWriterVersion()) { - throw new TrinoException(INVALID_TABLE_PROPERTY, format( - "%s cannot be downgraded from %d to %d", WRITER_VERSION_PROPERTY, currentProtocolEntry.getMinWriterVersion(), writerVersion.get())); + int requiredWriterVersion = currentProtocolEntry.getMinWriterVersion(); + Optional metadataEntry = Optional.empty(); + if (properties.containsKey(CHANGE_DATA_FEED_ENABLED_PROPERTY)) { + boolean changeDataFeedEnabled = (Boolean) properties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The change_data_feed_enabled property cannot be empty")); + if (changeDataFeedEnabled) { + requiredWriterVersion = max(requiredWriterVersion, CDF_SUPPORTED_WRITER_VERSION); } + Map configuration = new HashMap<>(handle.getMetadataEntry().getConfiguration()); + configuration.put(DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY, String.valueOf(changeDataFeedEnabled)); + metadataEntry = Optional.of(buildMetadataEntry(handle.getMetadataEntry(), configuration, createdTime)); } long readVersion = handle.getReadVersion(); long commitVersion = readVersion + 1; - ProtocolEntry protocolEntry = new ProtocolEntry( - readerVersion.orElse(currentProtocolEntry.getMinReaderVersion()), - writerVersion.orElse(currentProtocolEntry.getMinWriterVersion())); + Optional protocolEntry = Optional.empty(); + if (requiredWriterVersion != currentProtocolEntry.getMinWriterVersion()) { + protocolEntry = Optional.of(new ProtocolEntry(currentProtocolEntry.getMinReaderVersion(), requiredWriterVersion)); + } try { TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation()); - - long createdTime = Instant.now().toEpochMilli(); transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, SET_TBLPROPERTIES_OPERATION, readVersion)); - transactionLogWriter.appendProtocolEntry(protocolEntry); + protocolEntry.ifPresent(transactionLogWriter::appendProtocolEntry); + + metadataEntry.ifPresent(transactionLogWriter::appendMetadataEntry); transactionLogWriter.flush(); } @@ -1930,6 +1904,19 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta } } + private MetadataEntry buildMetadataEntry(MetadataEntry metadataEntry, Map configuration, long createdTime) + { + return new MetadataEntry( + metadataEntry.getId(), + metadataEntry.getName(), + metadataEntry.getDescription(), + metadataEntry.getFormat(), + metadataEntry.getSchemaString(), + metadataEntry.getOriginalPartitionColumns(), + configuration, + createdTime); + } + @Override public Map getSchemaProperties(ConnectorSession session, CatalogSchemaName schemaName) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java index 3f942574f80b..9f9bec41701c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java @@ -58,8 +58,6 @@ public class DeltaLakeMetadataFactory private final long perTransactionMetastoreCacheMaximumSize; private final boolean deleteSchemaLocationsFallback; private final boolean useUniqueTableLocation; - private final int defaultReaderVersion; - private final int defaultWriterVersion; private final boolean allowManagedTableRename; private final String trinoVersion; @@ -102,8 +100,6 @@ public DeltaLakeMetadataFactory( this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize(); this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback(); this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation(); - this.defaultReaderVersion = deltaLakeConfig.getDefaultReaderVersion(); - this.defaultWriterVersion = deltaLakeConfig.getDefaultWriterVersion(); this.allowManagedTableRename = allowManagedTableRename; this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); } @@ -145,8 +141,6 @@ public DeltaLakeMetadata create(ConnectorIdentity identity) deltaLakeRedirectionsProvider, statisticsAccess, useUniqueTableLocation, - allowManagedTableRename, - defaultReaderVersion, - defaultWriterVersion); + allowManagedTableRename); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java index a77f7fcb4ea1..243baba55b7b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java @@ -26,13 +26,8 @@ import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_READER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_READER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_WRITER_VERSION; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.session.PropertyMetadata.booleanProperty; -import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.longProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -45,8 +40,6 @@ public class DeltaLakeTableProperties public static final String PARTITIONED_BY_PROPERTY = "partitioned_by"; public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval"; public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled"; - public static final String READER_VERSION_PROPERTY = "reader_version"; - public static final String WRITER_VERSION_PROPERTY = "writer_version"; private final List> tableProperties; @@ -81,18 +74,6 @@ public DeltaLakeTableProperties() "Enables storing change data feed entries", null, false)) - .add(integerProperty( - READER_VERSION_PROPERTY, - "Reader version", - null, - value -> validateVersion(READER_VERSION_PROPERTY, value, MIN_READER_VERSION, MAX_READER_VERSION), - false)) - .add(integerProperty( - WRITER_VERSION_PROPERTY, - "Writer version", - null, - value -> validateVersion(WRITER_VERSION_PROPERTY, value, MIN_WRITER_VERSION, MAX_WRITER_VERSION), - false)) .build(); } @@ -128,23 +109,4 @@ public static Optional getChangeDataFeedEnabled(Map tab { return Optional.ofNullable((Boolean) tableProperties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY)); } - - public static Optional getReaderVersion(Map tableProperties) - { - return Optional.ofNullable((Integer) tableProperties.get(READER_VERSION_PROPERTY)); - } - - public static Optional getWriterVersion(Map tableProperties) - { - return Optional.ofNullable((Integer) tableProperties.get(WRITER_VERSION_PROPERTY)); - } - - private static void validateVersion(String propertyName, Object value, int minSupportedVersion, int maxSupportedVersion) - { - int version = (int) value; - if (version < minSupportedVersion || version > maxSupportedVersion) { - throw new TrinoException(INVALID_TABLE_PROPERTY, format( - "%s must be between %d and %d", propertyName, minSupportedVersion, maxSupportedVersion)); - } - } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java index 6d6574f87dc6..9a5cf346a48e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java @@ -34,9 +34,9 @@ public class MetadataEntry { public static final String DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY = "delta.checkpoint.writeStatsAsJson"; public static final String DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY = "delta.checkpoint.writeStatsAsStruct"; + public static final String DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY = "delta.enableChangeDataFeed"; private static final String DELTA_CHECKPOINT_INTERVAL_PROPERTY = "delta.checkpointInterval"; - private static final String DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY = "delta.enableChangeDataFeed"; private final String id; private final String name; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index bbbb237895f5..0bdcb8b89895 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -347,9 +347,7 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY['age'],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY['age']\n" + ")", SCHEMA, getLocationForTable(bucketName, "person"))); @@ -478,9 +476,7 @@ public void testCreatePartitionedTableAs() ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY['regionkey'],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY['regionkey']\n" + ")", DELTA_CATALOG, SCHEMA, tableName, getLocationForTable(bucketName, tableName))); assertQuery("SELECT * FROM " + tableName, "SELECT name, regionkey, comment FROM nation"); @@ -1351,9 +1347,7 @@ private void testDeltaLakeTableLocationChanged(boolean fewerEntries, boolean fir ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY[%s],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY[%s]\n" + ")", getSession().getCatalog().orElseThrow(), SCHEMA, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index ea8cf9b817a9..a7ddc486d15a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -261,9 +261,7 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " location = \\E'.*/test_schema/orders',\n\\Q" + - " partitioned_by = ARRAY[],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY[]\n" + ")"); } @@ -866,211 +864,39 @@ public void testThatEnableCdfTablePropertyIsShownForCtasTables() } @Test - public void testCreateTableWithInvalidReaderWriterVersion() - { - String tableName = "test_create_table_with_invalid_reader_writer_version_fails_" + randomNameSuffix(); - - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 0)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 3)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 1)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 5)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - } - - @Test - public void testCreateTableWithValidReaderWriterVersion() - { - String tableName = "test_create_table_with_valid_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); - assertThatShowCreateTable(tableName, ".*(reader_version = 1,(.*)writer_version = 2).*"); // default reader and writer version - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2)"); - assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 4)"); - assertThatShowCreateTable(tableName, ".*writer_version = 4.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2, writer_version = 3)"); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableAsWithInvalidReaderWriterVersion() - { - String tableName = "test_create_table_as_with_invalid_reader_writer_version_fails_" + randomNameSuffix(); - - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 0) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 3) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 1) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 5) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - } - - @Test - public void testCreateTableAsWithValidReaderWriterVersion() - { - String tableName = "test_create_table_as_with_valid_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(reader_version = 1,(.*)writer_version = 2).*"); // default reader and writer version - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 2) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 4) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*writer_version = 4.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 2, writer_version = 3) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableWithCdfEnabledWriterVersionUpgraded() - { - String tableName = "test_create_table_with_cdf_enabled_writer_version_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true)"); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableWithCdfPropertyWriterVersionNotUpgraded() - { - String tableName = "test_create_table_writer_version_not_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = false, writer_version = 3)"); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = false,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 4)"); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableAsWithCdfEnabledWriterVersionUpgraded() - { - String tableName = "test_create_table_as_with_cdf_enabled_writer_version_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableAsWithCdfPropertyWriterVersionNotUpgraded() - { - String tableName = "test_create_table_as_writer_version_not_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = false, writer_version = 3) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = false,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 4) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableWithCdfEnabledAndUnsupportedWriterVersionFails() - { - String tableName = "test_create_table_with_cdf_enabled_and_unsupported_writer_version_" + randomNameSuffix(); - - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 3)", - "writer_version cannot be set less than 4 when cdf is enabled"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 5)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 3) AS VALUES (1), (2)", - "writer_version cannot be set less than 4 when cdf is enabled"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 5) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - } - - @Test - public void testAlterTableWithInvalidReaderWriterVersion() - { - String tableName = "test_alter_table_with_invalid_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); - - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 0", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 3", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 1", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 5", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testAlterTableUpgradeReaderWriterVersion() + public void testAlterTableWithUnsupportedProperties() { - String tableName = "test_alter_table_upgrade_reader_writer_version_" + randomNameSuffix(); + String tableName = "test_alter_table_with_unsupported_properties_" + randomNameSuffix(); assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 2"); - assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); - - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 3"); - assertThatShowCreateTable(tableName, ".*writer_version = 3.*"); - - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 2, writer_version = 4"); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 4).*"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true, checkpoint_interval = 10", + "The following properties cannot be updated: checkpoint_interval"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES partitioned_by = ARRAY['a']", + "The following properties cannot be updated: partitioned_by"); assertUpdate("DROP TABLE " + tableName); } @Test - public void testAlterTableDowngradeReaderWriterVersion() + public void testSettingChangeDataFeedEnabledProperty() { - String tableName = "test_alter_table_downgrade_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2, writer_version = 3)"); - - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 1", - "reader_version cannot be downgraded from 2 to 1"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 2", - "writer_version cannot be downgraded from 3 to 2"); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); - - assertUpdate("DROP TABLE " + tableName); - } + String tableName = "test_enable_and_disable_cdf_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (page_url VARCHAR, domain VARCHAR, views INTEGER)"); - @Test - public void testAlterTableWithUnsupportedProperties() - { - String tableName = "test_alter_table_with_unsupported_properties_" + randomNameSuffix(); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = false"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)) + .contains("change_data_feed_enabled = false"); - assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)).contains("change_data_feed_enabled = true"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true", - "The following properties cannot be updated: change_data_feed_enabled"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true, checkpoint_interval = 10", - "The following properties cannot be updated: change_data_feed_enabled, checkpoint_interval"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 4, partitioned_by = ARRAY['a']", - "The following properties cannot be updated: partitioned_by"); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = false"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)).contains("change_data_feed_enabled = false"); - assertUpdate("DROP TABLE " + tableName); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)) + .contains("change_data_feed_enabled = true"); } @Override diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index fe4f8c84a6c6..93ee7fade662 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -19,10 +19,6 @@ import io.trino.plugin.hive.HiveCompressionCodec; import org.testng.annotations.Test; -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; - -import java.lang.annotation.Annotation; import java.util.Map; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -30,7 +26,6 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; -import static io.airlift.testing.ValidationAssertions.assertFailsValidation; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone; import static java.util.concurrent.TimeUnit.DAYS; @@ -72,9 +67,7 @@ public void testDefaults() .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) .setUniqueTableLocation(true) .setLegacyCreateTableWithExistingLocationEnabled(false) - .setRegisterTableProcedureEnabled(false) - .setDefaultReaderVersion(1) - .setDefaultWriterVersion(2)); + .setRegisterTableProcedureEnabled(false)); } @Test @@ -110,8 +103,6 @@ public void testExplicitPropertyMappings() .put("delta.unique-table-location", "false") .put("delta.legacy-create-table-with-existing-location.enabled", "true") .put("delta.register-table-procedure.enabled", "true") - .put("delta.default-reader-version", "2") - .put("delta.default-writer-version", "3") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -143,29 +134,8 @@ public void testExplicitPropertyMappings() .setTargetMaxFileSize(DataSize.of(2, GIGABYTE)) .setUniqueTableLocation(false) .setLegacyCreateTableWithExistingLocationEnabled(true) - .setRegisterTableProcedureEnabled(true) - .setDefaultReaderVersion(2) - .setDefaultWriterVersion(3); + .setRegisterTableProcedureEnabled(true); assertFullMapping(properties, expected); } - - @Test - public void testValidation() - { - assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(0), Min.class); - assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(3), Max.class); - assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(1), Min.class); - assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(5), Max.class); - } - - private void assertFailsReaderVersionValidation(DeltaLakeConfig config, Class annotation) - { - assertFailsValidation(config, "defaultReaderVersion", "Must be in between 1 and 2", annotation); - } - - private void assertFailsWriterVersionValidation(DeltaLakeConfig config, Class annotation) - { - assertFailsValidation(config, "defaultWriterVersion", "Must be in between 2 and 4", annotation); - } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index 6f9730ff768b..2637cc15f169 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -53,6 +53,8 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.DEFAULT_READER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.DEFAULT_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.spi.type.BigintType.BIGINT; @@ -161,7 +163,7 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter true, Optional.empty(), Optional.of(false), - new ProtocolEntry(deltaLakeConfig.getDefaultReaderVersion(), deltaLakeConfig.getDefaultWriterVersion())); + new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION)); DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider( new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()), diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java index 9efca06aefa1..a4218e90bfa2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java @@ -18,6 +18,7 @@ import com.google.inject.Inject; import com.google.inject.name.Named; import io.trino.tempto.BeforeTestWithContext; +import io.trino.testng.services.Flaky; import org.assertj.core.api.Assertions; import org.testng.annotations.Test; @@ -28,6 +29,8 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -50,6 +53,7 @@ public void setup() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUpdateTableWithCdf() { String tableName = "test_updates_to_table_with_cdf_" + randomNameSuffix(); @@ -84,6 +88,7 @@ public void testUpdateTableWithCdf() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUpdatePartitionedTableWithCdf() { String tableName = "test_updates_to_partitioned_table_with_cdf_" + randomNameSuffix(); @@ -115,6 +120,7 @@ public void testUpdatePartitionedTableWithCdf() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled() { String tableName = "test_updates_to_table_with_many_rows_inserted_in_one_query_cdf_" + randomNameSuffix(); @@ -142,6 +148,7 @@ public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdfEnabled() { String tableName = "test_updates_to_partitioned_table_with_many_rows_inserted_in_one_query_cdf_" + randomNameSuffix(); @@ -174,6 +181,7 @@ public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdf } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated() { String tableName = "test_updates_partitioning_column_in_table_with_cdf_" + randomNameSuffix(); @@ -211,6 +219,7 @@ public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated() { String tableName = "test_updates_to_table_with_cdf_enabled_later_" + randomNameSuffix(); @@ -262,6 +271,7 @@ public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testDeleteFromTableWithCdf() { String tableName = "test_deletes_from_table_with_cdf_" + randomNameSuffix(); @@ -290,6 +300,7 @@ public void testDeleteFromTableWithCdf() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testMergeUpdateIntoTableWithCdfEnabled() { String tableName1 = "test_merge_update_into_table_with_cdf_" + randomNameSuffix(); @@ -346,6 +357,7 @@ public void testMergeUpdateIntoTableWithCdfEnabled() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testMergeDeleteIntoTableWithCdfEnabled() { String tableName1 = "test_merge_delete_into_table_with_cdf_" + randomNameSuffix(); @@ -400,6 +412,7 @@ public void testMergeDeleteIntoTableWithCdfEnabled() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled() { String targetTableName = "test_merge_mixed_delete_and_update_into_table_with_cdf_" + randomNameSuffix(); @@ -461,6 +474,7 @@ public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testDeleteFromNullPartitionWithCdfEnabled() { String tableName = "test_delete_from_null_partition_with_cdf_enabled" + randomNameSuffix(); @@ -499,6 +513,61 @@ public void testDeleteFromNullPartitionWithCdfEnabled() } @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testTurningOnAndOffCdfFromTrino() + { + String tableName = "test_turning_cdf_on_and_off_from_trino" + randomNameSuffix(); + try { + onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (col1 VARCHAR, updated_column INT) " + + "WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', change_data_feed_enabled = true)"); + + Assertions.assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + tableName).getOnlyValue().toString()).contains("change_data_feed_enabled = true"); + + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)"); + onDelta().executeQuery("UPDATE default." + tableName + " SET updated_column = 10 WHERE col1 = 'testValue1'"); + assertThat(onDelta().executeQuery( + "SELECT col1, updated_column, _change_type, _commit_version " + + "FROM table_changes('default." + tableName + "', 0, 2)")) + .containsOnly( + row("testValue1", 1, "insert", 1L), + row("testValue1", 1, "update_preimage", 2L), + row("testValue1", 10, "update_postimage", 2L)); + + onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " SET PROPERTIES change_data_feed_enabled = false"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue2', 2)"); + onDelta().executeQuery("UPDATE default." + tableName + " SET updated_column = 20 WHERE col1 = 'testValue2'"); + assertQueryFailure(() -> onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version " + + "FROM table_changes('default." + tableName + "', 4, 5)")) + .hasMessageMatching("(?s)(.*Error getting change data for range \\[4 , 5] as change data was not\nrecorded for version \\[4].*)"); + assertThat(onDelta().executeQuery( + "SELECT col1, updated_column, _change_type, _commit_version " + + "FROM table_changes('default." + tableName + "', 0, 2)")) + .containsOnly( + row("testValue1", 1, "insert", 1L), + row("testValue1", 1, "update_preimage", 2L), + row("testValue1", 10, "update_postimage", 2L)); + + onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " SET PROPERTIES change_data_feed_enabled = true"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue3', 3)"); + onDelta().executeQuery("UPDATE default." + tableName + " SET updated_column = 30 WHERE col1 = 'testValue3'"); + assertThat(onDelta().executeQuery( + "SELECT col1, updated_column, _change_type, _commit_version " + + "FROM table_changes('default." + tableName + "', 7, 8)")) + .containsOnly( + row("testValue3", 3, "insert", 7L), + row("testValue3", 3, "update_preimage", 8L), + row("testValue3", 30, "update_postimage", 8L)); + + assertThat(onDelta().executeQuery("SELECT * FROM " + tableName)) + .containsOnly(row("testValue1", 10), row("testValue2", 20), row("testValue3", 30)); + } + finally { + onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testThatCdfDoesntWorkWhenPropertyIsNotSet() { String tableName1 = "test_cdf_doesnt_work_when_property_is_not_set_1_" + randomNameSuffix(); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java index 174d801d9102..30e8d719a978 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java @@ -158,9 +158,7 @@ public void testTrinoUsesCheckpointInterval() "WITH (\n" + " checkpoint_interval = 5,\n" + " location = 's3://%s/%s',\n" + - " partitioned_by = ARRAY['a_number'],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY['a_number']\n" + ")", tableName, bucketName,