From 7d7a9403979fc48f3df5a35b19f467ff195218e3 Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Sat, 18 Feb 2023 01:43:08 +0530 Subject: [PATCH 1/4] Support reader_version and writer_version properties using CREATE TABLE --- docs/src/main/sphinx/connector/delta-lake.rst | 10 ++- .../plugin/deltalake/DeltaLakeMetadata.java | 40 ++++++--- .../deltalake/DeltaLakeOutputTableHandle.java | 12 ++- .../deltalake/DeltaLakeTableProperties.java | 38 +++++++++ .../BaseDeltaLakeConnectorSmokeTest.java | 12 ++- .../BaseDeltaLakeMinioConnectorTest.java | 85 ++++++++++++++++++- .../deltalake/TestDeltaLakePageSink.java | 6 +- ...akeDatabricksCheckpointsCompatibility.java | 4 +- 8 files changed, 188 insertions(+), 19 deletions(-) diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 5826cada7f43..7281b690fa5c 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -563,15 +563,21 @@ The following properties are available for use: - Set the checkpoint interval in seconds. * - ``change_data_feed_enabled`` - Enables storing change data feed entries. + * - ``reader_version`` + - Set reader version. + * - ``writer_version`` + - Set writer version. -The following example uses all four table properties:: +The following example uses all six table properties:: CREATE TABLE example.default.example_partitioned_table WITH ( location = 's3://my-bucket/a/path', partitioned_by = ARRAY['regionkey'], checkpoint_interval = 5, - change_data_feed_enabled = true + change_data_feed_enabled = true, + reader_version = 2, + writer_version = 4 ) AS SELECT name, comment, regionkey FROM tpch.tiny.nation; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index a5f631775b64..12b02d32a4e2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -181,10 +181,14 @@ import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.PARTITIONED_BY_PROPERTY; +import static io.trino.plugin.deltalake.DeltaLakeTableProperties.READER_VERSION_PROPERTY; +import static io.trino.plugin.deltalake.DeltaLakeTableProperties.WRITER_VERSION_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getChangeDataFeedEnabled; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getCheckpointInterval; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getLocation; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getPartitionedBy; +import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getReaderVersion; +import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getWriterVersion; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE; @@ -280,13 +284,14 @@ public class DeltaLakeMetadata public static final String SET_TBLPROPERTIES_OPERATION = "SET TBLPROPERTIES"; public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN"; public static final String ISOLATION_LEVEL = "WriteSerializable"; - private static final int READER_VERSION = 1; - // The required writer version used by tables created by Trino - private static final int WRITER_VERSION = 2; - // The highest writer version Trino supports writing to - private static final int MAX_WRITER_VERSION = 4; - // This constant should be used only for a new table - private static final ProtocolEntry DEFAULT_PROTOCOL = new ProtocolEntry(READER_VERSION, WRITER_VERSION); + + // The required reader and writer versions used by tables created by Trino + public static final int MIN_READER_VERSION = 1; + public static final int MIN_WRITER_VERSION = 2; + // The highest reader and writer versions Trino supports writing to + public static final int MAX_READER_VERSION = 2; + public static final int MAX_WRITER_VERSION = 4; + // Matches the dummy column Databricks stores in the metastore private static final List DUMMY_DATA_COLUMNS = ImmutableList.of( new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty())); @@ -479,6 +484,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect Optional changeDataFeedEnabled = tableHandle.getMetadataEntry().isChangeDataFeedEnabled(); changeDataFeedEnabled.ifPresent(value -> properties.put(CHANGE_DATA_FEED_ENABLED_PROPERTY, value)); + ProtocolEntry protocolEntry = getProtocolEntry(session, tableHandle.getSchemaTableName()); + properties.put(READER_VERSION_PROPERTY, protocolEntry.getMinReaderVersion()); + properties.put(WRITER_VERSION_PROPERTY, protocolEntry.getMinWriterVersion()); + return new ConnectorTableMetadata( tableHandle.getSchemaTableName(), columns, @@ -747,7 +756,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe nodeVersion, nodeId, tableMetadata.getComment(), - DEFAULT_PROTOCOL); + getProtocolEntry(tableMetadata.getProperties())); setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory)); transactionLogWriter.flush(); @@ -871,7 +880,8 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con getCheckpointInterval(tableMetadata.getProperties()), external, tableMetadata.getComment(), - getChangeDataFeedEnabled(tableMetadata.getProperties())); + getChangeDataFeedEnabled(tableMetadata.getProperties()), + getProtocolEntry(tableMetadata.getProperties())); } private Optional getSchemaLocation(Database database) @@ -989,7 +999,7 @@ public Optional finishCreateTable( nodeVersion, nodeId, handle.getComment(), - DEFAULT_PROTOCOL); + handle.getProtocolEntry()); appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true); transactionLogWriter.flush(); @@ -1805,6 +1815,16 @@ private ProtocolEntry getProtocolEntry(ConnectorSession session, SchemaTableName return metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)); } + private ProtocolEntry getProtocolEntry(Map properties) + { + Optional readerVersion = getReaderVersion(properties); + Optional writerVersion = getWriterVersion(properties); + + return new ProtocolEntry( + readerVersion.orElse(MIN_READER_VERSION), + writerVersion.orElse(MIN_WRITER_VERSION)); + } + private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional checkpointInterval, long newVersion) { try { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java index 1fe2b02d7b09..7b01c669ff09 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.spi.connector.ConnectorOutputTableHandle; import java.util.List; @@ -37,6 +38,7 @@ public class DeltaLakeOutputTableHandle private final boolean external; private final Optional comment; private final Optional changeDataFeedEnabled; + private final ProtocolEntry protocolEntry; @JsonCreator public DeltaLakeOutputTableHandle( @@ -47,7 +49,8 @@ public DeltaLakeOutputTableHandle( @JsonProperty("checkpointInterval") Optional checkpointInterval, @JsonProperty("external") boolean external, @JsonProperty("comment") Optional comment, - @JsonProperty("changeDataFeedEnabled") Optional changeDataFeedEnabled) + @JsonProperty("changeDataFeedEnabled") Optional changeDataFeedEnabled, + @JsonProperty("protocolEntry") ProtocolEntry protocolEntry) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -57,6 +60,7 @@ public DeltaLakeOutputTableHandle( this.external = external; this.comment = requireNonNull(comment, "comment is null"); this.changeDataFeedEnabled = requireNonNull(changeDataFeedEnabled, "changeDataFeedEnabled is null"); + this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null"); } @JsonProperty @@ -115,4 +119,10 @@ public Optional getChangeDataFeedEnabled() { return changeDataFeedEnabled; } + + @JsonProperty + public ProtocolEntry getProtocolEntry() + { + return protocolEntry; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java index 243baba55b7b..aa1c900b5016 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableProperties.java @@ -26,8 +26,13 @@ import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_READER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.session.PropertyMetadata.booleanProperty; +import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.longProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -40,6 +45,8 @@ public class DeltaLakeTableProperties public static final String PARTITIONED_BY_PROPERTY = "partitioned_by"; public static final String CHECKPOINT_INTERVAL_PROPERTY = "checkpoint_interval"; public static final String CHANGE_DATA_FEED_ENABLED_PROPERTY = "change_data_feed_enabled"; + public static final String READER_VERSION_PROPERTY = "reader_version"; + public static final String WRITER_VERSION_PROPERTY = "writer_version"; private final List> tableProperties; @@ -74,6 +81,18 @@ public DeltaLakeTableProperties() "Enables storing change data feed entries", null, false)) + .add(integerProperty( + READER_VERSION_PROPERTY, + "Reader version", + null, + value -> validateVersion(READER_VERSION_PROPERTY, value, MIN_READER_VERSION, MAX_READER_VERSION), + false)) + .add(integerProperty( + WRITER_VERSION_PROPERTY, + "Writer version", + null, + value -> validateVersion(WRITER_VERSION_PROPERTY, value, MIN_WRITER_VERSION, MAX_WRITER_VERSION), + false)) .build(); } @@ -109,4 +128,23 @@ public static Optional getChangeDataFeedEnabled(Map tab { return Optional.ofNullable((Boolean) tableProperties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY)); } + + public static Optional getReaderVersion(Map tableProperties) + { + return Optional.ofNullable((Integer) tableProperties.get(READER_VERSION_PROPERTY)); + } + + public static Optional getWriterVersion(Map tableProperties) + { + return Optional.ofNullable((Integer) tableProperties.get(WRITER_VERSION_PROPERTY)); + } + + private static void validateVersion(String propertyName, Object value, int minSupportedVersion, int maxSupportedVersion) + { + int version = (int) value; + if (version < minSupportedVersion || version > maxSupportedVersion) { + throw new TrinoException(INVALID_TABLE_PROPERTY, format( + "%s must be between %d and %d", propertyName, minSupportedVersion, maxSupportedVersion)); + } + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 0bdcb8b89895..bbbb237895f5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -347,7 +347,9 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY['age']\n" + + " partitioned_by = ARRAY['age'],\n" + + " reader_version = 1,\n" + + " writer_version = 2\n" + ")", SCHEMA, getLocationForTable(bucketName, "person"))); @@ -476,7 +478,9 @@ public void testCreatePartitionedTableAs() ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY['regionkey']\n" + + " partitioned_by = ARRAY['regionkey'],\n" + + " reader_version = 1,\n" + + " writer_version = 2\n" + ")", DELTA_CATALOG, SCHEMA, tableName, getLocationForTable(bucketName, tableName))); assertQuery("SELECT * FROM " + tableName, "SELECT name, regionkey, comment FROM nation"); @@ -1347,7 +1351,9 @@ private void testDeltaLakeTableLocationChanged(boolean fewerEntries, boolean fir ")\n" + "WITH (\n" + " location = '%s',\n" + - " partitioned_by = ARRAY[%s]\n" + + " partitioned_by = ARRAY[%s],\n" + + " reader_version = 1,\n" + + " writer_version = 2\n" + ")", getSession().getCatalog().orElseThrow(), SCHEMA, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index 25a92ecb1dba..ac286db4505f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -260,7 +261,9 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " location = \\E'.*/test_schema/orders',\n\\Q" + - " partitioned_by = ARRAY[]\n" + + " partitioned_by = ARRAY[],\n" + + " reader_version = 1,\n" + + " writer_version = 2\n" + ")"); } @@ -862,6 +865,80 @@ public void testThatEnableCdfTablePropertyIsShownForCtasTables() .contains("change_data_feed_enabled = true"); } + @Test + public void testCreateTableWithInvalidReaderWriterVersion() + { + String tableName = "test_create_table_with_invalid_reader_writer_version_fails_" + randomNameSuffix(); + + assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 0)", + "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 3)", + "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 1)", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 5)", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); + } + + @Test + public void testCreateTableWithValidReaderWriterVersion() + { + String tableName = "test_create_table_with_valid_reader_writer_version_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); + assertThatShowCreateTable(tableName, ".*(reader_version = 1,(.*)writer_version = 2).*"); // default reader and writer version + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2)"); + assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (writer_version = 4)"); + assertThatShowCreateTable(tableName, ".*writer_version = 4.*"); + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2, writer_version = 3)"); + assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testCreateTableAsWithInvalidReaderWriterVersion() + { + String tableName = "test_create_table_as_with_invalid_reader_writer_version_fails_" + randomNameSuffix(); + + assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 0) AS VALUES (1), (2)", + "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 3) AS VALUES (1), (2)", + "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 1) AS VALUES (1), (2)", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 5) AS VALUES (1), (2)", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); + } + + @Test + public void testCreateTableAsWithValidReaderWriterVersion() + { + String tableName = "test_create_table_as_with_valid_reader_writer_version_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number) AS VALUES (1), (2)", 2); + assertThatShowCreateTable(tableName, ".*(reader_version = 1,(.*)writer_version = 2).*"); // default reader and writer version + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 2) AS VALUES (1), (2)", 2); + assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (writer_version = 4) AS VALUES (1), (2)", 2); + assertThatShowCreateTable(tableName, ".*writer_version = 4.*"); + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (reader_version = 2, writer_version = 3) AS VALUES (1), (2)", 2); + assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); + assertUpdate("DROP TABLE " + tableName); + } + @Override protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e) { @@ -923,4 +1000,10 @@ private List getTableFiles(String tableName) .map(path -> format("s3://%s/%s", bucketName, path)) .collect(toImmutableList()); } + + private void assertThatShowCreateTable(String tableName, String expectedRegex) + { + assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)) + .matches(Pattern.compile(expectedRegex, Pattern.DOTALL)); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index 6c8ec154f324..5efc02fa2a34 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -20,6 +20,7 @@ import io.airlift.slice.Slices; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.operator.GroupByHashPageIndexerFactory; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.hive.HiveTransactionHandle; import io.trino.plugin.hive.NodeVersion; import io.trino.spi.Page; @@ -52,6 +53,8 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.spi.type.BigintType.BIGINT; @@ -159,7 +162,8 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter Optional.of(deltaLakeConfig.getDefaultCheckpointWritingInterval()), true, Optional.empty(), - Optional.of(false)); + Optional.of(false), + new ProtocolEntry(MIN_READER_VERSION, MIN_WRITER_VERSION)); DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider( new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()), diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java index 30e8d719a978..174d801d9102 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java @@ -158,7 +158,9 @@ public void testTrinoUsesCheckpointInterval() "WITH (\n" + " checkpoint_interval = 5,\n" + " location = 's3://%s/%s',\n" + - " partitioned_by = ARRAY['a_number']\n" + + " partitioned_by = ARRAY['a_number'],\n" + + " reader_version = 1,\n" + + " writer_version = 2\n" + ")", tableName, bucketName, From 1974db2b704566923f375ada809ffe667bd20597 Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Tue, 21 Feb 2023 15:00:54 +0530 Subject: [PATCH 2/4] Set writer_version to v4 when change_data_feed_enabled is set to true --- .../plugin/deltalake/DeltaLakeMetadata.java | 16 +++++ .../BaseDeltaLakeMinioConnectorTest.java | 63 +++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 12b02d32a4e2..ee159a477088 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -292,6 +292,8 @@ public class DeltaLakeMetadata public static final int MAX_READER_VERSION = 2; public static final int MAX_WRITER_VERSION = 4; + private static final int CDF_SUPPORTED_WRITER_VERSION = 4; + // Matches the dummy column Databricks stores in the metastore private static final List DUMMY_DATA_COLUMNS = ImmutableList.of( new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty())); @@ -1819,7 +1821,21 @@ private ProtocolEntry getProtocolEntry(Map properties) { Optional readerVersion = getReaderVersion(properties); Optional writerVersion = getWriterVersion(properties); + Optional changeDataFeedEnabled = getChangeDataFeedEnabled(properties); + if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) { + if (writerVersion.isPresent()) { + if (writerVersion.get() < CDF_SUPPORTED_WRITER_VERSION) { + throw new TrinoException( + INVALID_TABLE_PROPERTY, + WRITER_VERSION_PROPERTY + " cannot be set less than " + CDF_SUPPORTED_WRITER_VERSION + " when cdf is enabled"); + } + } + else { + // Enabling cdf (change data feed) requires setting the writer version to 4 + writerVersion = Optional.of(CDF_SUPPORTED_WRITER_VERSION); + } + } return new ProtocolEntry( readerVersion.orElse(MIN_READER_VERSION), writerVersion.orElse(MIN_WRITER_VERSION)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index ac286db4505f..4910bd965fcd 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -939,6 +939,69 @@ public void testCreateTableAsWithValidReaderWriterVersion() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testCreateTableWithCdfEnabledWriterVersionUpgraded() + { + String tableName = "test_create_table_with_cdf_enabled_writer_version_upgraded_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true)"); + assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testCreateTableWithCdfPropertyWriterVersionNotUpgraded() + { + String tableName = "test_create_table_writer_version_not_upgraded_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = false, writer_version = 3)"); + assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = false,(.*)writer_version = 3).*"); + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 4)"); + assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testCreateTableAsWithCdfEnabledWriterVersionUpgraded() + { + String tableName = "test_create_table_as_with_cdf_enabled_writer_version_upgraded_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true) AS VALUES (1), (2)", 2); + assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testCreateTableAsWithCdfPropertyWriterVersionNotUpgraded() + { + String tableName = "test_create_table_as_writer_version_not_upgraded_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = false, writer_version = 3) AS VALUES (1), (2)", 2); + assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = false,(.*)writer_version = 3).*"); + assertUpdate("DROP TABLE " + tableName); + + assertUpdate("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 4) AS VALUES (1), (2)", 2); + assertThatShowCreateTable(tableName, ".*(change_data_feed_enabled = true,(.*)writer_version = 4).*"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testCreateTableWithCdfEnabledAndUnsupportedWriterVersionFails() + { + String tableName = "test_create_table_with_cdf_enabled_and_unsupported_writer_version_" + randomNameSuffix(); + + assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 3)", + "writer_version cannot be set less than 4 when cdf is enabled"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number INT) WITH (change_data_feed_enabled = true, writer_version = 5)", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 3) AS VALUES (1), (2)", + "writer_version cannot be set less than 4 when cdf is enabled"); + assertQueryFails("CREATE TABLE " + tableName + " (a_number) WITH (change_data_feed_enabled = true, writer_version = 5) AS VALUES (1), (2)", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); + } + @Override protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e) { From 79b50b93fc215dc4d18daf45374e1c9bb4a07a2b Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Tue, 21 Feb 2023 22:17:34 +0530 Subject: [PATCH 3/4] Create DeltaLakeMetadata#getCommitInfoEntry to reduce code duplication --- .../plugin/deltalake/DeltaLakeMetadata.java | 97 ++++++------------- 1 file changed, 28 insertions(+), 69 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index ee159a477088..c94704058c55 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -755,8 +755,6 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe configurationForNewTable(checkpointInterval, changeDataFeedEnabled), CREATE_TABLE_OPERATION, session, - nodeVersion, - nodeId, tableMetadata.getComment(), getProtocolEntry(tableMetadata.getProperties())); @@ -998,8 +996,6 @@ public Optional finishCreateTable( configurationForNewTable(handle.getCheckpointInterval(), handle.getChangeDataFeedEnabled()), CREATE_TABLE_AS_OPERATION, session, - nodeVersion, - nodeId, handle.getComment(), handle.getProtocolEntry()); appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true); @@ -1093,8 +1089,6 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table handle.getMetadataEntry().getConfiguration(), SET_TBLPROPERTIES_OPERATION, session, - nodeVersion, - nodeId, comment, getProtocolEntry(session, handle.getSchemaTableName())); transactionLogWriter.flush(); @@ -1141,8 +1135,6 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl deltaLakeTableHandle.getMetadataEntry().getConfiguration(), CHANGE_COLUMN_OPERATION, session, - nodeVersion, - nodeId, Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()), getProtocolEntry(session, deltaLakeTableHandle.getSchemaTableName())); transactionLogWriter.flush(); @@ -1200,8 +1192,6 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle handle.getMetadataEntry().getConfiguration(), ADD_COLUMN_OPERATION, session, - nodeVersion, - nodeId, Optional.ofNullable(handle.getMetadataEntry().getDescription()), getProtocolEntry(session, handle.getSchemaTableName())); transactionLogWriter.flush(); @@ -1211,7 +1201,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle } } - private static void appendTableEntries( + private void appendTableEntries( long commitVersion, TransactionLogWriter transactionLogWriter, String tableId, @@ -1223,26 +1213,11 @@ private static void appendTableEntries( Map configuration, String operation, ConnectorSession session, - String nodeVersion, - String nodeId, Optional comment, ProtocolEntry protocolEntry) { long createdTime = System.currentTimeMillis(); - transactionLogWriter.appendCommitInfoEntry( - new CommitInfoEntry( - commitVersion, - createdTime, - session.getUser(), - session.getUser(), - operation, - ImmutableMap.of("queryId", session.getQueryId()), - null, - null, - "trino-" + nodeVersion + "-" + nodeId, - 0, - ISOLATION_LEVEL, - Optional.of(true))); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, operation, 0)); transactionLogWriter.appendProtocolEntry(protocolEntry); @@ -1377,20 +1352,8 @@ public Optional finishInsert( commitVersion - 1)); } Optional checkpointInterval = handle.getMetadataEntry().getCheckpointInterval(); - transactionLogWriter.appendCommitInfoEntry( - new CommitInfoEntry( - commitVersion, - createdTime, - session.getUser(), - session.getUser(), - INSERT_OPERATION, - ImmutableMap.of("queryId", session.getQueryId()), - null, - null, - "trino-" + nodeVersion + "-" + nodeId, - handle.getReadVersion(), // it is not obvious why we need to persist this readVersion - ISOLATION_LEVEL, - Optional.of(true))); + // it is not obvious why we need to persist this readVersion + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, INSERT_OPERATION, handle.getReadVersion())); // Note: during writes we want to preserve original case of partition columns List partitionColumns = handle.getMetadataEntry().getOriginalPartitionColumns(); @@ -1525,20 +1488,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl } long commitVersion = currentVersion + 1; - transactionLogWriter.appendCommitInfoEntry( - new CommitInfoEntry( - commitVersion, - createdTime, - session.getUser(), - session.getUser(), - MERGE_OPERATION, - ImmutableMap.of("queryId", session.getQueryId()), - null, - null, - "trino-" + nodeVersion + "-" + nodeId, - handle.getReadVersion(), - ISOLATION_LEVEL, - Optional.of(true))); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, MERGE_OPERATION, handle.getReadVersion())); // TODO: Delta writes another field "operationMetrics" (https://github.com/trinodb/trino/issues/12005) long writeTimestamp = Instant.now().toEpochMilli(); @@ -1736,20 +1686,7 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl long createdTime = Instant.now().toEpochMilli(); long commitVersion = readVersion + 1; - transactionLogWriter.appendCommitInfoEntry( - new CommitInfoEntry( - commitVersion, - createdTime, - session.getUser(), - session.getUser(), - OPTIMIZE_OPERATION, - ImmutableMap.of("queryId", session.getQueryId()), - null, - null, - "trino-" + nodeVersion + "-" + nodeId, - readVersion, - ISOLATION_LEVEL, - Optional.of(true))); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, OPTIMIZE_OPERATION, readVersion)); // TODO: Delta writes another field "operationMetrics" that I haven't // seen before. It contains delete/update metrics. Investigate/include it. @@ -1917,6 +1854,28 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand metastore.renameTable(session, handle.getSchemaTableName(), newTableName); } + private CommitInfoEntry getCommitInfoEntry( + ConnectorSession session, + long commitVersion, + long createdTime, + String operation, + long readVersion) + { + return new CommitInfoEntry( + commitVersion, + createdTime, + session.getUser(), + session.getUser(), + operation, + ImmutableMap.of("queryId", session.getQueryId()), + null, + null, + "trino-" + nodeVersion + "-" + nodeId, + readVersion, + ISOLATION_LEVEL, + Optional.of(true)); + } + @Override public Map getSchemaProperties(ConnectorSession session, CatalogSchemaName schemaName) { From 082a0c2caba8e5ef24eda0a3615b591a07b8f0cc Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Sat, 18 Feb 2023 05:56:41 +0530 Subject: [PATCH 4/4] Support reader_version and writer_version properties using ALTER TABLE --- .../plugin/deltalake/DeltaLakeMetadata.java | 54 ++++++++++++++ .../BaseDeltaLakeMinioConnectorTest.java | 71 +++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index c94704058c55..10e910e3ccef 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -158,6 +158,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.MoreCollectors.toOptional; +import static com.google.common.collect.Sets.difference; import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getColumnNames; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getFilesModifiedAfterProperty; @@ -302,6 +303,7 @@ public class DeltaLakeMetadata .add(NUMBER_OF_DISTINCT_VALUES_SUMMARY) .build(); private static final String ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY = "delta.enable-non-concurrent-writes"; + public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(READER_VERSION_PROPERTY, WRITER_VERSION_PROPERTY); private final DeltaLakeMetastore metastore; private final TrinoFileSystemFactory fileSystemFactory; @@ -1876,6 +1878,58 @@ private CommitInfoEntry getCommitInfoEntry( Optional.of(true)); } + @Override + public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map> properties) + { + Set unsupportedProperties = difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES); + if (!unsupportedProperties.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties)); + } + + DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle.getSchemaTableName()); + + Optional readerVersion = Optional.empty(); + if (properties.containsKey(READER_VERSION_PROPERTY)) { + readerVersion = Optional.of((int) properties.get(READER_VERSION_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The " + READER_VERSION_PROPERTY + " property cannot be empty"))); + if (readerVersion.get() < currentProtocolEntry.getMinReaderVersion()) { + throw new TrinoException(INVALID_TABLE_PROPERTY, format( + "%s cannot be downgraded from %d to %d", READER_VERSION_PROPERTY, currentProtocolEntry.getMinReaderVersion(), readerVersion.get())); + } + } + + Optional writerVersion = Optional.empty(); + if (properties.containsKey(WRITER_VERSION_PROPERTY)) { + writerVersion = Optional.of((int) properties.get(WRITER_VERSION_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The " + WRITER_VERSION_PROPERTY + " property cannot be empty"))); + if (writerVersion.get() < currentProtocolEntry.getMinWriterVersion()) { + throw new TrinoException(INVALID_TABLE_PROPERTY, format( + "%s cannot be downgraded from %d to %d", WRITER_VERSION_PROPERTY, currentProtocolEntry.getMinWriterVersion(), writerVersion.get())); + } + } + + long readVersion = handle.getReadVersion(); + long commitVersion = readVersion + 1; + + ProtocolEntry protocolEntry = new ProtocolEntry( + readerVersion.orElse(currentProtocolEntry.getMinReaderVersion()), + writerVersion.orElse(currentProtocolEntry.getMinWriterVersion())); + + try { + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation()); + + long createdTime = Instant.now().toEpochMilli(); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, SET_TBLPROPERTIES_OPERATION, readVersion)); + transactionLogWriter.appendProtocolEntry(protocolEntry); + + transactionLogWriter.flush(); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e); + } + } + @Override public Map getSchemaProperties(ConnectorSession session, CatalogSchemaName schemaName) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index 4910bd965fcd..ea8cf9b817a9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -1002,6 +1002,77 @@ public void testCreateTableWithCdfEnabledAndUnsupportedWriterVersionFails() "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); } + @Test + public void testAlterTableWithInvalidReaderWriterVersion() + { + String tableName = "test_alter_table_with_invalid_reader_writer_version_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); + + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 0", + "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[0]: reader_version must be between 1 and 2"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 3", + "Unable to set catalog 'delta_lake' table property 'reader_version' to \\[3]: reader_version must be between 1 and 2"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 1", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[1]: writer_version must be between 2 and 4"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 5", + "Unable to set catalog 'delta_lake' table property 'writer_version' to \\[5]: writer_version must be between 2 and 4"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testAlterTableUpgradeReaderWriterVersion() + { + String tableName = "test_alter_table_upgrade_reader_writer_version_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 2"); + assertThatShowCreateTable(tableName, ".*reader_version = 2.*"); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 3"); + assertThatShowCreateTable(tableName, ".*writer_version = 3.*"); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 2, writer_version = 4"); + assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 4).*"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testAlterTableDowngradeReaderWriterVersion() + { + String tableName = "test_alter_table_downgrade_reader_writer_version_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT) WITH (reader_version = 2, writer_version = 3)"); + + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES reader_version = 1", + "reader_version cannot be downgraded from 2 to 1"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 2", + "writer_version cannot be downgraded from 3 to 2"); + assertThatShowCreateTable(tableName, ".*(reader_version = 2,(.*)writer_version = 3).*"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testAlterTableWithUnsupportedProperties() + { + String tableName = "test_alter_table_with_unsupported_properties_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a_number INT)"); + + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true", + "The following properties cannot be updated: change_data_feed_enabled"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true, checkpoint_interval = 10", + "The following properties cannot be updated: change_data_feed_enabled, checkpoint_interval"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES writer_version = 4, partitioned_by = ARRAY['a']", + "The following properties cannot be updated: partitioned_by"); + + assertUpdate("DROP TABLE " + tableName); + } + @Override protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e) {