diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index f48caf69b073..69cefd0cea05 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -157,16 +157,6 @@ values. Typical usage does not require you to configure them. * - ``delta.register-table-procedure.enabled`` - Enable to allow users to call the ``register_table`` procedure - ``false`` - * - ``delta.default-reader-version`` - - The default reader version used by new tables. - The value can be overridden for a specific table with the - ``reader_version`` table property. - - ``1`` - * - ``delta.default-writer-version`` - - The default writer version used by new tables. - The value can be overridden for a specific table with the - ``writer_version`` table property. - - ``2`` The following table describes performance tuning catalog properties for the connector. @@ -573,21 +563,15 @@ The following properties are available for use: - Set the checkpoint interval in seconds. * - ``change_data_feed_enabled`` - Enables storing change data feed entries. - * - ``reader_version`` - - Set reader version. - * - ``writer_version`` - - Set writer version. -The following example uses all six table properties:: +The following example uses multiple table properties:: CREATE TABLE example.default.example_partitioned_table WITH ( location = 's3://my-bucket/a/path', partitioned_by = ARRAY['regionkey'], checkpoint_interval = 5, - change_data_feed_enabled = true, - reader_version = 2, - writer_version = 4 + change_data_feed_enabled = true ) AS SELECT name, comment, regionkey FROM tpch.tiny.nation; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 2ebecc975eb6..352b18f6c684 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -25,7 +25,6 @@ import javax.validation.constraints.DecimalMax; import javax.validation.constraints.DecimalMin; -import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -49,12 +48,6 @@ public class DeltaLakeConfig @VisibleForTesting static final DataSize DEFAULT_DATA_FILE_CACHE_SIZE = DataSize.succinctBytes(Math.floorDiv(Runtime.getRuntime().maxMemory(), 10L)); - public static final int MIN_READER_VERSION = 1; - public static final int MIN_WRITER_VERSION = 2; - // The highest reader and writer versions Trino supports writing to - public static final int MAX_READER_VERSION = 2; - public static final int MAX_WRITER_VERSION = 4; - private Duration metadataCacheTtl = new Duration(5, TimeUnit.MINUTES); private long metadataCacheMaxSize = 1000; private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE; @@ -84,8 +77,6 @@ public class DeltaLakeConfig private boolean uniqueTableLocation = true; private boolean legacyCreateTableWithExistingLocationEnabled; private boolean registerTableProcedureEnabled; - private int defaultReaderVersion = MIN_READER_VERSION; - private int defaultWriterVersion = MIN_WRITER_VERSION; public Duration getMetadataCacheTtl() { @@ -484,34 +475,4 @@ public DeltaLakeConfig setRegisterTableProcedureEnabled(boolean registerTablePro this.registerTableProcedureEnabled = registerTableProcedureEnabled; return this; } - - @Min(value = MIN_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION) - @Max(value = MAX_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION) - public int getDefaultReaderVersion() - { - return defaultReaderVersion; - } - - @Config("delta.default-reader-version") - @ConfigDescription("The default reader version used by new tables") - public DeltaLakeConfig setDefaultReaderVersion(int defaultReaderVersion) - { - this.defaultReaderVersion = defaultReaderVersion; - return this; - } - - @Min(value = MIN_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION) - @Max(value = MAX_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION) - public int getDefaultWriterVersion() - { - return defaultWriterVersion; - } - - @Config("delta.default-writer-version") - @ConfigDescription("The default writer version used by new tables") - public DeltaLakeConfig setDefaultWriterVersion(int defaultWriterVersion) - { - this.defaultWriterVersion = defaultWriterVersion; - return this; - } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 65922784e667..bedb60a43251 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -158,7 +158,6 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.MoreCollectors.toOptional; -import static com.google.common.collect.Sets.difference; import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getColumnNames; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getFilesModifiedAfterProperty; @@ -171,7 +170,6 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName; @@ -183,14 +181,10 @@ import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.PARTITIONED_BY_PROPERTY; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.READER_VERSION_PROPERTY; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.WRITER_VERSION_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getChangeDataFeedEnabled; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getCheckpointInterval; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getLocation; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getPartitionedBy; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getReaderVersion; -import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getWriterVersion; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE; @@ -287,6 +281,11 @@ public class DeltaLakeMetadata public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN"; public static final String ISOLATION_LEVEL = "WriteSerializable"; + public static final int DEFAULT_READER_VERSION = 1; + public static final int DEFAULT_WRITER_VERSION = 2; + // The highest reader and writer versions Trino supports writing to + public static final int MAX_WRITER_VERSION = 4; + private static final int CDF_SUPPORTED_WRITER_VERSION = 4; // Matches the dummy column Databricks stores in the metastore @@ -297,7 +296,6 @@ public class DeltaLakeMetadata .add(NUMBER_OF_DISTINCT_VALUES_SUMMARY) .build(); private static final String ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY = "delta.enable-non-concurrent-writes"; - public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(READER_VERSION_PROPERTY, WRITER_VERSION_PROPERTY); private final DeltaLakeMetastore metastore; private final TrinoFileSystemFactory fileSystemFactory; @@ -320,8 +318,6 @@ public class DeltaLakeMetadata private final boolean deleteSchemaLocationsFallback; private final boolean useUniqueTableLocation; private final boolean allowManagedTableRename; - private final int defaultReaderVersion; - private final int defaultWriterVersion; public DeltaLakeMetadata( DeltaLakeMetastore metastore, @@ -342,9 +338,7 @@ public DeltaLakeMetadata( DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider, ExtendedStatisticsAccess statisticsAccess, boolean useUniqueTableLocation, - boolean allowManagedTableRename, - int defaultReaderVersion, - int defaultWriterVersion) + boolean allowManagedTableRename) { this.metastore = requireNonNull(metastore, "metastore is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); @@ -366,8 +360,6 @@ public DeltaLakeMetadata( this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; this.useUniqueTableLocation = useUniqueTableLocation; this.allowManagedTableRename = allowManagedTableRename; - this.defaultReaderVersion = defaultReaderVersion; - this.defaultWriterVersion = defaultWriterVersion; } @Override @@ -488,10 +480,6 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect Optional changeDataFeedEnabled = tableHandle.getMetadataEntry().isChangeDataFeedEnabled(); changeDataFeedEnabled.ifPresent(value -> properties.put(CHANGE_DATA_FEED_ENABLED_PROPERTY, value)); - ProtocolEntry protocolEntry = getProtocolEntry(session, tableHandle.getSchemaTableName()); - properties.put(READER_VERSION_PROPERTY, protocolEntry.getMinReaderVersion()); - properties.put(WRITER_VERSION_PROPERTY, protocolEntry.getMinWriterVersion()); - return new ConnectorTableMetadata( tableHandle.getSchemaTableName(), columns, @@ -1758,26 +1746,13 @@ private ProtocolEntry getProtocolEntry(ConnectorSession session, SchemaTableName private ProtocolEntry getProtocolEntry(Map properties) { - Optional readerVersion = getReaderVersion(properties); - Optional writerVersion = getWriterVersion(properties); - Optional changeDataFeedEnabled = getChangeDataFeedEnabled(properties); - - if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) { - if (writerVersion.isPresent()) { - if (writerVersion.get() < CDF_SUPPORTED_WRITER_VERSION) { - throw new TrinoException( - INVALID_TABLE_PROPERTY, - WRITER_VERSION_PROPERTY + " cannot be set less than " + CDF_SUPPORTED_WRITER_VERSION + " when cdf is enabled"); - } - } - else { - // Enabling cdf (change data feed) requires setting the writer version to 4 - writerVersion = Optional.of(CDF_SUPPORTED_WRITER_VERSION); - } + int readerVersion = DEFAULT_READER_VERSION; + int writerVersion = DEFAULT_WRITER_VERSION; + if (getChangeDataFeedEnabled(properties).orElse(false)) { + writerVersion = CDF_SUPPORTED_WRITER_VERSION; } - return new ProtocolEntry( - readerVersion.orElse(defaultReaderVersion), - writerVersion.orElse(defaultWriterVersion)); + + return new ProtocolEntry(readerVersion, writerVersion); } private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional checkpointInterval, long newVersion) @@ -1878,58 +1853,6 @@ private CommitInfoEntry getCommitInfoEntry( Optional.of(true)); } - @Override - public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map> properties) - { - Set unsupportedProperties = difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES); - if (!unsupportedProperties.isEmpty()) { - throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties)); - } - - DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; - ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle.getSchemaTableName()); - - Optional readerVersion = Optional.empty(); - if (properties.containsKey(READER_VERSION_PROPERTY)) { - readerVersion = Optional.of((int) properties.get(READER_VERSION_PROPERTY) - .orElseThrow(() -> new IllegalArgumentException("The " + READER_VERSION_PROPERTY + " property cannot be empty"))); - if (readerVersion.get() < currentProtocolEntry.getMinReaderVersion()) { - throw new TrinoException(INVALID_TABLE_PROPERTY, format( - "%s cannot be downgraded from %d to %d", READER_VERSION_PROPERTY, currentProtocolEntry.getMinReaderVersion(), readerVersion.get())); - } - } - - Optional writerVersion = Optional.empty(); - if (properties.containsKey(WRITER_VERSION_PROPERTY)) { - writerVersion = Optional.of((int) properties.get(WRITER_VERSION_PROPERTY) - .orElseThrow(() -> new IllegalArgumentException("The " + WRITER_VERSION_PROPERTY + " property cannot be empty"))); - if (writerVersion.get() < currentProtocolEntry.getMinWriterVersion()) { - throw new TrinoException(INVALID_TABLE_PROPERTY, format( - "%s cannot be downgraded from %d to %d", WRITER_VERSION_PROPERTY, currentProtocolEntry.getMinWriterVersion(), writerVersion.get())); - } - } - - long readVersion = handle.getReadVersion(); - long commitVersion = readVersion + 1; - - ProtocolEntry protocolEntry = new ProtocolEntry( - readerVersion.orElse(currentProtocolEntry.getMinReaderVersion()), - writerVersion.orElse(currentProtocolEntry.getMinWriterVersion())); - - try { - TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation()); - - long createdTime = Instant.now().toEpochMilli(); - transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, SET_TBLPROPERTIES_OPERATION, readVersion)); - transactionLogWriter.appendProtocolEntry(protocolEntry); - - transactionLogWriter.flush(); - } - catch (IOException e) { - throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e); - } - } - @Override public Map getSchemaProperties(ConnectorSession session, CatalogSchemaName schemaName) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java index 3f942574f80b..9f9bec41701c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java @@ -58,8 +58,6 @@ public class DeltaLakeMetadataFactory private final long perTransactionMetastoreCacheMaximumSize; private final boolean deleteSchemaLocationsFallback; private final boolean useUniqueTableLocation; - private final int defaultReaderVersion; - private final int defaultWriterVersion; private final boolean allowManagedTableRename; private final String trinoVersion; @@ -102,8 +100,6 @@ public DeltaLakeMetadataFactory( this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize(); this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback(); this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation(); - this.defaultReaderVersion = deltaLakeConfig.getDefaultReaderVersion(); - this.defaultWriterVersion = deltaLakeConfig.getDefaultWriterVersion(); this.allowManagedTableRename = allowManagedTableRename; this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); } @@ -145,8 +141,6 @@ public DeltaLakeMetadata create(ConnectorIdentity identity) deltaLakeRedirectionsProvider, statisticsAccess, useUniqueTableLocation, - allowManagedTableRename, - defaultReaderVersion, - defaultWriterVersion); + allowManagedTableRename); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java index a77f7fcb4ea1..243baba55b7b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java @@ -26,13 +26,8 @@ import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_READER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_READER_VERSION; -import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_WRITER_VERSION; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.session.PropertyMetadata.booleanProperty; -import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.longProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -45,8 +40,6 @@ public class DeltaLakeTableProperties public static final String PARTITIONED_BY_PROPERTY = "partitioned_by"; public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval"; public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled"; - public static final String READER_VERSION_PROPERTY = "reader_version"; - public static final String WRITER_VERSION_PROPERTY = "writer_version"; private final List> tableProperties; @@ -81,18 +74,6 @@ public DeltaLakeTableProperties() "Enables storing change data feed entries", null, false)) - .add(integerProperty( - READER_VERSION_PROPERTY, - "Reader version", - null, - value -> validateVersion(READER_VERSION_PROPERTY, value, MIN_READER_VERSION, MAX_READER_VERSION), - false)) - .add(integerProperty( - WRITER_VERSION_PROPERTY, - "Writer version", - null, - value -> validateVersion(WRITER_VERSION_PROPERTY, value, MIN_WRITER_VERSION, MAX_WRITER_VERSION), - false)) .build(); } @@ -128,23 +109,4 @@ public static Optional getChangeDataFeedEnabled(Map tab { return Optional.ofNullable((Boolean) tableProperties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY)); } - - public static Optional getReaderVersion(Map tableProperties) - { - return Optional.ofNullable((Integer) tableProperties.get(READER_VERSION_PROPERTY)); - } - - public static Optional getWriterVersion(Map tableProperties) - { - return Optional.ofNullable((Integer) tableProperties.get(WRITER_VERSION_PROPERTY)); - } - - private static void validateVersion(String propertyName, Object value, int minSupportedVersion, int maxSupportedVersion) - { - int version = (int) value; - if (version < minSupportedVersion || version > maxSupportedVersion) { - throw new TrinoException(INVALID_TABLE_PROPERTY, format( - "%s must be between %d and %d", propertyName, minSupportedVersion, maxSupportedVersion)); - } - } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index bbbb237895f5..0bdcb8b89895 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -347,9 +347,7 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY['age'],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY['age']\n" + ")", SCHEMA, getLocationForTable(bucketName, "person"))); @@ -478,9 +476,7 @@ public void testCreatePartitionedTableAs() ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY['regionkey'],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY['regionkey']\n" + ")", DELTA_CATALOG, SCHEMA, tableName, getLocationForTable(bucketName, tableName))); assertQuery("SELECT * FROM " + tableName, "SELECT name, regionkey, comment FROM nation"); @@ -1351,9 +1347,7 @@ private void testDeltaLakeTableLocationChanged(boolean fewerEntries, boolean fir ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY[%s],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY[%s]\n" + ")", getSession().getCatalog().orElseThrow(), SCHEMA, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index ea8cf9b817a9..121953dc3f4f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -261,9 +261,7 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " location = \\E'.*/test_schema/orders',\n\\Q" + - " partitioned_by = ARRAY[],\n" + - " reader_version = 1,\n" + - " writer_version = 2\n" + + " partitioned_by = ARRAY[]\n" + ")"); } @@ -865,214 +863,6 @@ public void testThatEnableCdfTablePropertyIsShownForCtasTables() .contains("change_data_feed_enabled = true"); } - @Test - public void testCreateTableWithInvalidReaderWriterVersion() - { - String tableName = "test_create_table_with_invalid_reader_writer_version_fails_" + randomNameSuffix(); - - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 0)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 3)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 1)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 5)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - } - - @Test - public void testCreateTableWithValidReaderWriterVersion() - { - String tableName = "test_create_table_with_valid_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); - assertThatShowCreateTable(tableName, ".*(reader_version = 1,(.*)writer_version = 2).*"); // default reader and writer version - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2)"); - assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 4)"); - assertThatShowCreateTable(tableName, ".*writer_version = 4.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2, writer_version = 3)"); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableAsWithInvalidReaderWriterVersion() - { - String tableName = "test_create_table_as_with_invalid_reader_writer_version_fails_" + randomNameSuffix(); - - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 0) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 3) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 1) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 5) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - } - - @Test - public void testCreateTableAsWithValidReaderWriterVersion() - { - String tableName = "test_create_table_as_with_valid_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(reader_version = 1,(.*)writer_version = 2).*"); // default reader and writer version - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 2) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 4) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*writer_version = 4.*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 2, writer_version = 3) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableWithCdfEnabledWriterVersionUpgraded() - { - String tableName = "test_create_table_with_cdf_enabled_writer_version_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true)"); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableWithCdfPropertyWriterVersionNotUpgraded() - { - String tableName = "test_create_table_writer_version_not_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = false, writer_version = 3)"); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = false,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 4)"); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableAsWithCdfEnabledWriterVersionUpgraded() - { - String tableName = "test_create_table_as_with_cdf_enabled_writer_version_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableAsWithCdfPropertyWriterVersionNotUpgraded() - { - String tableName = "test_create_table_as_writer_version_not_upgraded_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = false, writer_version = 3) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = false,(.*)writer_version = 3).*"); - assertUpdate("DROP TABLE " + tableName); - - assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 4) AS VALUES (1), (2)", 2); - assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableWithCdfEnabledAndUnsupportedWriterVersionFails() - { - String tableName = "test_create_table_with_cdf_enabled_and_unsupported_writer_version_" + randomNameSuffix(); - - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 3)", - "writer_version cannot be set less than 4 when cdf is enabled"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 5)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 3) AS VALUES (1), (2)", - "writer_version cannot be set less than 4 when cdf is enabled"); - assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 5) AS VALUES (1), (2)", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - } - - @Test - public void testAlterTableWithInvalidReaderWriterVersion() - { - String tableName = "test_alter_table_with_invalid_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); - - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 0", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 3", - "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 1", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 5", - "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testAlterTableUpgradeReaderWriterVersion() - { - String tableName = "test_alter_table_upgrade_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); - - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 2"); - assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); - - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 3"); - assertThatShowCreateTable(tableName, ".*writer_version = 3.*"); - - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 2, writer_version = 4"); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 4).*"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testAlterTableDowngradeReaderWriterVersion() - { - String tableName = "test_alter_table_downgrade_reader_writer_version_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2, writer_version = 3)"); - - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 1", - "reader_version cannot be downgraded from 2 to 1"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 2", - "writer_version cannot be downgraded from 3 to 2"); - assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testAlterTableWithUnsupportedProperties() - { - String tableName = "test_alter_table_with_unsupported_properties_" + randomNameSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); - - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true", - "The following properties cannot be updated: change_data_feed_enabled"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true, checkpoint_interval = 10", - "The following properties cannot be updated: change_data_feed_enabled, checkpoint_interval"); - assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 4, partitioned_by = ARRAY['a']", - "The following properties cannot be updated: partitioned_by"); - - assertUpdate("DROP TABLE " + tableName); - } - @Override protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index fe4f8c84a6c6..93ee7fade662 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -19,10 +19,6 @@ import io.trino.plugin.hive.HiveCompressionCodec; import org.testng.annotations.Test; -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; - -import java.lang.annotation.Annotation; import java.util.Map; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -30,7 +26,6 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; -import static io.airlift.testing.ValidationAssertions.assertFailsValidation; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone; import static java.util.concurrent.TimeUnit.DAYS; @@ -72,9 +67,7 @@ public void testDefaults() .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) .setUniqueTableLocation(true) .setLegacyCreateTableWithExistingLocationEnabled(false) - .setRegisterTableProcedureEnabled(false) - .setDefaultReaderVersion(1) - .setDefaultWriterVersion(2)); + .setRegisterTableProcedureEnabled(false)); } @Test @@ -110,8 +103,6 @@ public void testExplicitPropertyMappings() .put("delta.unique-table-location", "false") .put("delta.legacy-create-table-with-existing-location.enabled", "true") .put("delta.register-table-procedure.enabled", "true") - .put("delta.default-reader-version", "2") - .put("delta.default-writer-version", "3") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -143,29 +134,8 @@ public void testExplicitPropertyMappings() .setTargetMaxFileSize(DataSize.of(2, GIGABYTE)) .setUniqueTableLocation(false) .setLegacyCreateTableWithExistingLocationEnabled(true) - .setRegisterTableProcedureEnabled(true) - .setDefaultReaderVersion(2) - .setDefaultWriterVersion(3); + .setRegisterTableProcedureEnabled(true); assertFullMapping(properties, expected); } - - @Test - public void testValidation() - { - assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(0), Min.class); - assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(3), Max.class); - assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(1), Min.class); - assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(5), Max.class); - } - - private void assertFailsReaderVersionValidation(DeltaLakeConfig config, Class annotation) - { - assertFailsValidation(config, "defaultReaderVersion", "Must be in between 1 and 2", annotation); - } - - private void assertFailsWriterVersionValidation(DeltaLakeConfig config, Class annotation) - { - assertFailsValidation(config, "defaultWriterVersion", "Must be in between 2 and 4", annotation); - } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index 6f9730ff768b..2637cc15f169 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -53,6 +53,8 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.DEFAULT_READER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.DEFAULT_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.spi.type.BigintType.BIGINT; @@ -161,7 +163,7 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter true, Optional.empty(), Optional.of(false), - new ProtocolEntry(deltaLakeConfig.getDefaultReaderVersion(), deltaLakeConfig.getDefaultWriterVersion())); + new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION)); DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider( new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()),