diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 43b59e2d2860..9d5025aecb87 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -698,6 +698,7 @@ jobs: - suite-delta-lake-databricks73 - suite-delta-lake-databricks91 - suite-delta-lake-databricks104 + - suite-delta-lake-databricks113 - suite-gcs exclude: - config: default @@ -737,6 +738,11 @@ jobs: - suite: suite-delta-lake-databricks104 ignore exclusion if: >- ${{ secrets.DATABRICKS_TOKEN != '' }} + - suite: suite-delta-lake-databricks113 + config: hdp3 + - suite: suite-delta-lake-databricks113 + ignore exclusion if: >- + ${{ secrets.DATABRICKS_TOKEN != '' }} ignore exclusion if: # Do not use this property outside of the matrix configuration. @@ -811,6 +817,7 @@ jobs: DATABRICKS_73_JDBC_URL: DATABRICKS_91_JDBC_URL: DATABRICKS_104_JDBC_URL: + DATABRICKS_113_JDBC_URL: DATABRICKS_LOGIN: DATABRICKS_TOKEN: GCP_CREDENTIALS_KEY: @@ -875,6 +882,7 @@ jobs: DATABRICKS_73_JDBC_URL: ${{ secrets.DATABRICKS_73_JDBC_URL }} DATABRICKS_91_JDBC_URL: ${{ secrets.DATABRICKS_91_JDBC_URL }} DATABRICKS_104_JDBC_URL: ${{ secrets.DATABRICKS_104_JDBC_URL }} + DATABRICKS_113_JDBC_URL: ${{ secrets.DATABRICKS_113_JDBC_URL }} DATABRICKS_LOGIN: token DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }} diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 4cf9834f6430..2a60c64e0b7c 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -16,7 +16,7 @@ Requirements To connect to Databricks Delta Lake, you need: -* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS and 10.4 LTS are supported. +* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS and 11.3 LTS are supported. * Deployments using AWS, HDFS, Azure Storage, and Google Cloud Storage (GCS) are fully supported. * Network access from the coordinator and workers to the Delta Lake storage. diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeDeltaLakeDatabricks113.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeDeltaLakeDatabricks113.java new file mode 100644 index 000000000000..d8248cec2d60 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeDeltaLakeDatabricks113.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.inject.Inject; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import static java.util.Objects.requireNonNull; + +@TestsEnvironment +public class EnvSinglenodeDeltaLakeDatabricks113 + extends AbstractSinglenodeDeltaLakeDatabricks +{ + @Inject + public EnvSinglenodeDeltaLakeDatabricks113(Standard standard, DockerFiles dockerFiles) + { + super(standard, dockerFiles); + } + + @Override + String databricksTestJdbcUrl() + { + return requireNonNull(System.getenv("DATABRICKS_113_JDBC_URL"), "Environment DATABRICKS_113_JDBC_URL was not set"); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks113.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks113.java new file mode 100644 index 000000000000..989ba6495b90 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeDatabricks113.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.suite.suites; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeDeltaLakeDatabricks113; +import io.trino.tests.product.launcher.suite.SuiteDeltaLakeDatabricks; +import io.trino.tests.product.launcher.suite.SuiteTestRun; + +import java.util.List; + +import static io.trino.tests.product.launcher.suite.SuiteTestRun.testOnEnvironment; + +public class SuiteDeltaLakeDatabricks113 + extends SuiteDeltaLakeDatabricks +{ + @Override + public List getTestRuns(EnvironmentConfig config) + { + return ImmutableList.of( + testOnEnvironment(EnvSinglenodeDeltaLakeDatabricks113.class) + .withGroups("configured_features", "delta-lake-databricks") + .withExcludedGroups("delta-lake-exclude-113") + .withExcludedTests(getExcludedTests()) + .build()); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 606ea898c4a8..86c1249bcf50 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -81,6 +81,7 @@ public final class TestGroups public static final String DELTA_LAKE_DATABRICKS = "delta-lake-databricks"; public static final String DELTA_LAKE_EXCLUDE_73 = "delta-lake-exclude-73"; public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91"; + public static final String DELTA_LAKE_EXCLUDE_113 = "delta-lake-exclude-113"; public static final String PARQUET = "parquet"; private TestGroups() {} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index bed1ed5daeff..86c73cf270c6 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -28,7 +28,7 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; -import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_91_RUNTIME_VERSION; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta; @@ -363,7 +363,7 @@ public void testTrinoAlterTablePreservesGeneratedColumn() onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN c INT"); Assertions.assertThat((String) onDelta().executeQuery("SHOW CREATE TABLE default." + tableName).getOnlyValue()) - .contains((getDatabricksRuntimeVersion().equals(DATABRICKS_91_RUNTIME_VERSION) ? "`b`" : "b") + " INT GENERATED ALWAYS AS ( a * 2 )"); + .contains((getDatabricksRuntimeVersion().orElseThrow().equals(DATABRICKS_91_RUNTIME_VERSION) ? "`b`" : "b") + " INT GENERATED ALWAYS AS ( a * 2 )"); onDelta().executeQuery("INSERT INTO default." + tableName + " (a, c) VALUES (1, 3)"); assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) .containsOnly(row(1, 2, 3)); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java index 0880e323aa53..6f10c6ab2965 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java @@ -22,6 +22,7 @@ import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.assertions.QueryAssert; import io.trino.testng.services.Flaky; +import io.trino.tests.product.deltalake.util.DatabricksVersion; import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -38,8 +39,9 @@ import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertLastEntryIsCheckpointed; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion; -import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_104_RUNTIME_VERSION; -import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_91_RUNTIME_VERSION; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_113_RUNTIME_VERSION; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; @@ -58,14 +60,14 @@ public class TestDeltaLakeDatabricksCheckpointsCompatibility private String s3ServerType; private AmazonS3 s3; - private String databricksRuntimeVersion; + private DatabricksVersion databricksRuntimeVersion; @BeforeTestWithContext public void setup() { super.setUp(); s3 = new S3ClientFactory().createS3Client(s3ServerType); - databricksRuntimeVersion = getDatabricksRuntimeVersion(); + databricksRuntimeVersion = getDatabricksRuntimeVersion().orElseThrow(); } @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) @@ -206,22 +208,18 @@ public void testDatabricksUsesCheckpointInterval() try { // validate that Databricks can see the checkpoint interval String showCreateTable; - if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + if (databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION)) { showCreateTable = format( "CREATE TABLE spark_catalog.default.%s (\n" + " a_number BIGINT,\n" + " a_string STRING)\n" + "USING delta\n" + "PARTITIONED BY (a_number)\n" + - "LOCATION 's3://%s/%s'\n" + - "TBLPROPERTIES (\n" + - " 'Type' = 'EXTERNAL',\n" + - " 'delta.checkpointInterval' = '3',\n" + - " 'delta.minReaderVersion' = '1',\n" + - " 'delta.minWriterVersion' = '2')\n", + "LOCATION 's3://%s/%s'\n%s", tableName, bucketName, - tableDirectory); + tableDirectory, + getDatabricksTablePropertiesWithCheckpointInterval()); } else { showCreateTable = format( @@ -261,6 +259,24 @@ public void testDatabricksUsesCheckpointInterval() } } + private String getDatabricksTablePropertiesWithCheckpointInterval() + { + if (databricksRuntimeVersion.equals(DATABRICKS_113_RUNTIME_VERSION)) { + return "TBLPROPERTIES (\n" + + " 'delta.checkpointInterval' = '3',\n" + + " 'delta.minReaderVersion' = '1',\n" + + " 'delta.minWriterVersion' = '2')\n"; + } + if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + return "TBLPROPERTIES (\n" + + " 'Type' = 'EXTERNAL',\n" + + " 'delta.checkpointInterval' = '3',\n" + + " 'delta.minReaderVersion' = '1',\n" + + " 'delta.minWriterVersion' = '2')\n"; + } + throw new IllegalArgumentException("Unsupported databricks runtime version: " + databricksRuntimeVersion); + } + @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testTrinoCheckpointMinMaxStatisticsForRowType() @@ -312,7 +328,7 @@ private void testCheckpointMinMaxStatisticsForRowType(Consumer sqlExecut // Assert min/max queries can be computed from just metadata String explainSelectMax = getOnlyElement(onDelta().executeQuery("EXPLAIN SELECT max(root.entry_one) FROM default." + tableName).column(1)); - String column = databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_one" : "root.entry_one AS `entry_one`"; + String column = databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_one" : "root.entry_one AS `entry_one`"; assertThat(explainSelectMax).matches("== Physical Plan ==\\s*LocalTableScan \\[max\\(" + column + "\\).*]\\s*"); // check both engines can read both tables @@ -378,7 +394,7 @@ private void testCheckpointNullStatisticsForRowType(Consumer sqlExecutor // Assert counting non null entries can be computed from just metadata String explainCountNotNull = getOnlyElement(onDelta().executeQuery("EXPLAIN SELECT count(root.entry_two) FROM default." + tableName).column(1)); - String column = databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_two" : "root.entry_two AS `entry_two`"; + String column = databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_two" : "root.entry_two AS `entry_two`"; assertThat(explainCountNotNull).matches("== Physical Plan ==\\s*LocalTableScan \\[count\\(" + column + "\\).*]\\s*"); // check both engines can read both tables @@ -509,7 +525,7 @@ private void testWriteStatsAsJsonEnabled(Consumer sqlExecutor, String ta " delta.checkpoint.writeStatsAsStruct = true)", tableName, type, bucketName); - if (getDatabricksRuntimeVersion().equals(DATABRICKS_91_RUNTIME_VERSION) && type.equals("struct")) { + if (databricksRuntimeVersion.equals(DATABRICKS_91_RUNTIME_VERSION) && type.equals("struct")) { assertThatThrownBy(() -> onDelta().executeQuery(createTableSql)).hasStackTraceContaining("ParseException"); throw new SkipException("New runtime version covers the type"); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java index 24ed53f9776b..023a35a8970f 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java @@ -36,6 +36,7 @@ import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertLastEntryIsCheckpointed; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion; @@ -226,7 +227,8 @@ public void testReplaceTableWithSchemaChange() } } - @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) + // Databricks 11.3 doesn't create a checkpoint file at 'CREATE OR REPLACE TABLE' statement + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testReplaceTableWithSchemaChangeOnCheckpoint() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java index de29e208f66f..2f1d111868ee 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableCompatibility.java @@ -17,6 +17,7 @@ import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.assertions.QueryAssert; import io.trino.testng.services.Flaky; +import io.trino.tests.product.deltalake.util.DatabricksVersion; import org.testng.annotations.Test; import java.util.List; @@ -25,7 +26,8 @@ import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; -import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_104_RUNTIME_VERSION; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_113_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta; @@ -41,13 +43,13 @@ public class TestDeltaLakeDatabricksCreateTableCompatibility extends BaseTestDeltaLakeS3Storage { - private String databricksRuntimeVersion; + private DatabricksVersion databricksRuntimeVersion; @BeforeTestWithContext public void setup() { super.setUp(); - databricksRuntimeVersion = getDatabricksRuntimeVersion(); + databricksRuntimeVersion = getDatabricksRuntimeVersion().orElseThrow(); } @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) @@ -66,13 +68,13 @@ public void testDatabricksCanReadInitialCreateTable() assertThat(onDelta().executeQuery("SHOW TABLES FROM default LIKE '" + tableName + "'")).contains(row("default", tableName, false)); assertThat(onDelta().executeQuery("SELECT count(*) FROM default." + tableName)).contains(row(0)); String showCreateTable; - if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + if (databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION)) { showCreateTable = format( "CREATE TABLE spark_catalog.default.%s (\n integer INT,\n string STRING,\n timetz TIMESTAMP)\nUSING delta\nLOCATION 's3://%s/%s'\n%s", tableName, bucketName, tableDirectory, - getDatabricks104DefaultTableProperties()); + getDatabricksDefaultTableProperties()); } else { showCreateTable = format( @@ -108,14 +110,14 @@ public void testDatabricksCanReadInitialCreatePartitionedTable() assertThat(onDelta().executeQuery("SHOW TABLES LIKE '" + tableName + "'")).contains(row("default", tableName, false)); assertThat(onDelta().executeQuery("SELECT count(*) FROM " + tableName)).contains(row(0)); String showCreateTable; - if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + if (databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION)) { showCreateTable = format( "CREATE TABLE spark_catalog.default.%s (\n integer INT,\n string STRING,\n timetz TIMESTAMP)\nUSING delta\n" + "PARTITIONED BY (string)\nLOCATION 's3://%s/%s'\n%s", tableName, bucketName, tableDirectory, - getDatabricks104DefaultTableProperties()); + getDatabricksDefaultTableProperties()); } else { showCreateTable = format( @@ -150,13 +152,13 @@ public void testDatabricksCanReadInitialCreateTableAs() assertThat(onDelta().executeQuery("SHOW TABLES FROM default LIKE '" + tableName + "'")).contains(row("default", tableName, false)); assertThat(onDelta().executeQuery("SELECT count(*) FROM default." + tableName)).contains(row(3)); String showCreateTable; - if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + if (databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION)) { showCreateTable = format( "CREATE TABLE spark_catalog.default.%s (\n integer INT,\n string STRING,\n timetz TIMESTAMP)\nUSING delta\nLOCATION 's3://%s/%s'\n%s", tableName, bucketName, tableDirectory, - getDatabricks104DefaultTableProperties()); + getDatabricksDefaultTableProperties()); } else { showCreateTable = format( @@ -195,14 +197,14 @@ public void testDatabricksCanReadInitialCreatePartitionedTableAs() assertThat(onDelta().executeQuery("SHOW TABLES LIKE '" + tableName + "'")).contains(row("default", tableName, false)); assertThat(onDelta().executeQuery("SELECT count(*) FROM " + tableName)).contains(row(3)); String showCreateTable; - if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + if (databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION)) { showCreateTable = format( "CREATE TABLE spark_catalog.default.%s (\n integer INT,\n string STRING,\n timetz TIMESTAMP)\nUSING delta\n" + "PARTITIONED BY (string)\nLOCATION 's3://%s/%s'\n%s", tableName, bucketName, tableDirectory, - getDatabricks104DefaultTableProperties()); + getDatabricksDefaultTableProperties()); } else { showCreateTable = format( @@ -310,11 +312,19 @@ public void testCreateTableWithColumnCommentOnDelta() } } - private String getDatabricks104DefaultTableProperties() + private String getDatabricksDefaultTableProperties() { - return "TBLPROPERTIES (\n" + - " 'Type' = 'EXTERNAL',\n" + - " 'delta.minReaderVersion' = '1',\n" + - " 'delta.minWriterVersion' = '2')\n"; + if (databricksRuntimeVersion.equals(DATABRICKS_113_RUNTIME_VERSION)) { + return "TBLPROPERTIES (\n" + + " 'delta.minReaderVersion' = '1',\n" + + " 'delta.minWriterVersion' = '2')\n"; + } + if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + return "TBLPROPERTIES (\n" + + " 'Type' = 'EXTERNAL',\n" + + " 'delta.minReaderVersion' = '1',\n" + + " 'delta.minWriterVersion' = '2')\n"; + } + throw new IllegalArgumentException("Unsupported databricks runtime version: " + databricksRuntimeVersion); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index ba32b5adeae0..713f97a684b4 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -17,10 +17,12 @@ import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.assertions.QueryAssert.Row; import io.trino.testng.services.Flaky; +import io.trino.tests.product.deltalake.util.DatabricksVersion; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -32,7 +34,7 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; -import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_104_RUNTIME_VERSION; +import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; @@ -44,7 +46,7 @@ public class TestDeltaLakeDatabricksInsertCompatibility extends BaseTestDeltaLakeS3Storage { - private String databricksRuntimeVersion; + private Optional databricksRuntimeVersion; @BeforeTestWithContext public void setup() @@ -411,7 +413,7 @@ private void testCompression(boolean optimizedWriter, String compressionCodec) assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)) .containsOnly(expected); - if ("ZSTD".equals(compressionCodec) && !databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) { + if ("ZSTD".equals(compressionCodec) && databricksRuntimeVersion.orElseThrow().isOlderThan(DATABRICKS_104_RUNTIME_VERSION)) { assertQueryFailure(() -> onDelta().executeQuery("SELECT * FROM default." + tableName)) .hasMessageContaining("java.lang.ClassNotFoundException: org.apache.hadoop.io.compress.ZStandardCodec"); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeTransactionLogCache.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeTransactionLogCache.java index 8a439bc5731c..b7d2dd95254a 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeTransactionLogCache.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeTransactionLogCache.java @@ -63,7 +63,7 @@ public void testAllDataFilesAreLoadedWhenTransactionLogFileAfterTheCachedTableVe String tableName = "test_dl_cached_table_files_accuracy_" + randomTableSuffix(); String tableDirectory = "databricks-compatibility-test-" + tableName; - onTrino().executeQuery(format("CREATE TABLE delta.default.%s (col INT) WITH (location = 's3://%s/%s')", + onTrino().executeQuery(format("CREATE TABLE delta.default.%s (col INT) WITH (location = 's3://%s/%s', checkpoint_interval = 10)", tableName, bucketName, tableDirectory)); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DatabricksVersion.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DatabricksVersion.java new file mode 100644 index 000000000000..bcf548ab7143 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DatabricksVersion.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake.util; + +import com.google.common.collect.ComparisonChain; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public record DatabricksVersion(int majorVersion, int minorVersion) + implements Comparable +{ + public static final DatabricksVersion DATABRICKS_113_RUNTIME_VERSION = new DatabricksVersion(11, 3); + public static final DatabricksVersion DATABRICKS_104_RUNTIME_VERSION = new DatabricksVersion(10, 4); + public static final DatabricksVersion DATABRICKS_91_RUNTIME_VERSION = new DatabricksVersion(9, 1); + + private static final Pattern DATABRICKS_VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)"); + + public boolean isAtLeast(DatabricksVersion version) + { + return compareTo(version) >= 0; + } + + public boolean isOlderThan(DatabricksVersion version) + { + return compareTo(version) < 0; + } + + @Override + public int compareTo(DatabricksVersion other) + { + return ComparisonChain.start() + .compare(majorVersion, other.majorVersion) + .compare(minorVersion, other.minorVersion) + .result(); + } + + public static DatabricksVersion parse(String versionString) + { + Matcher matcher = DATABRICKS_VERSION_PATTERN.matcher(versionString); + if (!matcher.matches()) { + throw new IllegalArgumentException("Cannot parse Databricks version " + versionString); + } + int majorVersion = Integer.parseInt(matcher.group(1)); + int minorVersion = Integer.parseInt(matcher.group(2)); + return new DatabricksVersion(majorVersion, minorVersion); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java index ae07381813bb..2eb7e31ca284 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java @@ -16,16 +16,14 @@ import io.trino.tempto.query.QueryResult; import org.intellij.lang.annotations.Language; -import static com.google.common.base.MoreObjects.firstNonNull; +import java.util.Optional; + import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; public final class DeltaLakeTestUtils { - public static final String DATABRICKS_104_RUNTIME_VERSION = "10.4"; - public static final String DATABRICKS_91_RUNTIME_VERSION = "9.1"; - public static final String DATABRICKS_COMMUNICATION_FAILURE_ISSUE = "https://github.com/trinodb/trino/issues/14391"; @Language("RegExp") public static final String DATABRICKS_COMMUNICATION_FAILURE_MATCH = @@ -33,9 +31,14 @@ public final class DeltaLakeTestUtils private DeltaLakeTestUtils() {} - public static String getDatabricksRuntimeVersion() + public static Optional getDatabricksRuntimeVersion() { - return firstNonNull((String) onDelta().executeQuery("SELECT java_method('java.lang.System', 'getenv', 'DATABRICKS_RUNTIME_VERSION')").getOnlyValue(), "unknown"); + String version = (String) onDelta().executeQuery("SELECT java_method('java.lang.System', 'getenv', 'DATABRICKS_RUNTIME_VERSION')").getOnlyValue(); + // OSS Spark returns null + if (version.equals("null")) { + return Optional.empty(); + } + return Optional.of(DatabricksVersion.parse(version)); } public static String getColumnCommentOnTrino(String schemaName, String tableName, String columnName)