Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ jobs:
- suite-delta-lake-databricks73
- suite-delta-lake-databricks91
- suite-delta-lake-databricks104
- suite-delta-lake-databricks113
- suite-gcs
exclude:
- config: default
Expand Down Expand Up @@ -737,6 +738,11 @@ jobs:
- suite: suite-delta-lake-databricks104
ignore exclusion if: >-
${{ secrets.DATABRICKS_TOKEN != '' }}
- suite: suite-delta-lake-databricks113
config: hdp3
- suite: suite-delta-lake-databricks113
ignore exclusion if: >-
${{ secrets.DATABRICKS_TOKEN != '' }}
Comment on lines 741 to 745
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this two entries?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's under exclude section. These two entries are required to ignore hdp3 always and then enable another one when the secret exists in my understanding.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nineinchnick @hashhar please review the ci.yml changes

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Note it adds quite a lot of time to the total CI runtime:
obraz

Do we need to test all LTSes?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've brought up EOLing the tests for version 7.3. Maybe it's time, though DB is supporting it through March


ignore exclusion if:
# Do not use this property outside of the matrix configuration.
Expand Down Expand Up @@ -811,6 +817,7 @@ jobs:
DATABRICKS_73_JDBC_URL:
DATABRICKS_91_JDBC_URL:
DATABRICKS_104_JDBC_URL:
DATABRICKS_113_JDBC_URL:
DATABRICKS_LOGIN:
DATABRICKS_TOKEN:
GCP_CREDENTIALS_KEY:
Expand Down Expand Up @@ -875,6 +882,7 @@ jobs:
DATABRICKS_73_JDBC_URL: ${{ secrets.DATABRICKS_73_JDBC_URL }}
DATABRICKS_91_JDBC_URL: ${{ secrets.DATABRICKS_91_JDBC_URL }}
DATABRICKS_104_JDBC_URL: ${{ secrets.DATABRICKS_104_JDBC_URL }}
DATABRICKS_113_JDBC_URL: ${{ secrets.DATABRICKS_113_JDBC_URL }}
DATABRICKS_LOGIN: token
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }}
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Requirements

To connect to Databricks Delta Lake, you need:

* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS and 10.4 LTS are supported.
* Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS and 11.3 LTS are supported.
* Deployments using AWS, HDFS, Azure Storage, and Google Cloud Storage (GCS) are
fully supported.
* Network access from the coordinator and workers to the Delta Lake storage.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.launcher.env.environment;

import com.google.inject.Inject;
import io.trino.tests.product.launcher.docker.DockerFiles;
import io.trino.tests.product.launcher.env.common.Standard;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;

import static java.util.Objects.requireNonNull;

@TestsEnvironment
public class EnvSinglenodeDeltaLakeDatabricks113
extends AbstractSinglenodeDeltaLakeDatabricks
{
@Inject
public EnvSinglenodeDeltaLakeDatabricks113(Standard standard, DockerFiles dockerFiles)
{
super(standard, dockerFiles);
}

@Override
String databricksTestJdbcUrl()
{
return requireNonNull(System.getenv("DATABRICKS_113_JDBC_URL"), "Environment DATABRICKS_113_JDBC_URL was not set");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.launcher.suite.suites;

import com.google.common.collect.ImmutableList;
import io.trino.tests.product.launcher.env.EnvironmentConfig;
import io.trino.tests.product.launcher.env.environment.EnvSinglenodeDeltaLakeDatabricks113;
import io.trino.tests.product.launcher.suite.SuiteDeltaLakeDatabricks;
import io.trino.tests.product.launcher.suite.SuiteTestRun;

import java.util.List;

import static io.trino.tests.product.launcher.suite.SuiteTestRun.testOnEnvironment;

public class SuiteDeltaLakeDatabricks113
extends SuiteDeltaLakeDatabricks
{
@Override
public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
{
return ImmutableList.of(
testOnEnvironment(EnvSinglenodeDeltaLakeDatabricks113.class)
.withGroups("configured_features", "delta-lake-databricks")
.withExcludedGroups("delta-lake-exclude-113")
.withExcludedTests(getExcludedTests())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public final class TestGroups
public static final String DELTA_LAKE_DATABRICKS = "delta-lake-databricks";
public static final String DELTA_LAKE_EXCLUDE_73 = "delta-lake-exclude-73";
public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91";
public static final String DELTA_LAKE_EXCLUDE_113 = "delta-lake-exclude-113";
public static final String PARQUET = "parquet";

private TestGroups() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_91_RUNTIME_VERSION;
import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta;
Expand Down Expand Up @@ -363,7 +363,7 @@ public void testTrinoAlterTablePreservesGeneratedColumn()
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN c INT");

Assertions.assertThat((String) onDelta().executeQuery("SHOW CREATE TABLE default." + tableName).getOnlyValue())
.contains((getDatabricksRuntimeVersion().equals(DATABRICKS_91_RUNTIME_VERSION) ? "`b`" : "b") + " INT GENERATED ALWAYS AS ( a * 2 )");
.contains((getDatabricksRuntimeVersion().orElseThrow().equals(DATABRICKS_91_RUNTIME_VERSION) ? "`b`" : "b") + " INT GENERATED ALWAYS AS ( a * 2 )");
onDelta().executeQuery("INSERT INTO default." + tableName + " (a, c) VALUES (1, 3)");
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 2, 3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.tempto.BeforeTestWithContext;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.testng.services.Flaky;
import io.trino.tests.product.deltalake.util.DatabricksVersion;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -38,8 +39,9 @@
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertLastEntryIsCheckpointed;
import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_104_RUNTIME_VERSION;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_91_RUNTIME_VERSION;
import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION;
import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_113_RUNTIME_VERSION;
import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion;
Expand All @@ -58,14 +60,14 @@ public class TestDeltaLakeDatabricksCheckpointsCompatibility
private String s3ServerType;

private AmazonS3 s3;
private String databricksRuntimeVersion;
private DatabricksVersion databricksRuntimeVersion;

@BeforeTestWithContext
public void setup()
{
super.setUp();
s3 = new S3ClientFactory().createS3Client(s3ServerType);
databricksRuntimeVersion = getDatabricksRuntimeVersion();
databricksRuntimeVersion = getDatabricksRuntimeVersion().orElseThrow();
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
Expand Down Expand Up @@ -206,22 +208,18 @@ public void testDatabricksUsesCheckpointInterval()
try {
// validate that Databricks can see the checkpoint interval
String showCreateTable;
if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) {
if (databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION)) {
showCreateTable = format(
"CREATE TABLE spark_catalog.default.%s (\n" +
" a_number BIGINT,\n" +
" a_string STRING)\n" +
"USING delta\n" +
"PARTITIONED BY (a_number)\n" +
"LOCATION 's3://%s/%s'\n" +
"TBLPROPERTIES (\n" +
" 'Type' = 'EXTERNAL',\n" +
" 'delta.checkpointInterval' = '3',\n" +
" 'delta.minReaderVersion' = '1',\n" +
" 'delta.minWriterVersion' = '2')\n",
"LOCATION 's3://%s/%s'\n%s",
tableName,
bucketName,
tableDirectory);
tableDirectory,
getDatabricksTablePropertiesWithCheckpointInterval());
}
else {
showCreateTable = format(
Expand Down Expand Up @@ -261,6 +259,24 @@ public void testDatabricksUsesCheckpointInterval()
}
}

private String getDatabricksTablePropertiesWithCheckpointInterval()
{
if (databricksRuntimeVersion.equals(DATABRICKS_113_RUNTIME_VERSION)) {
return "TBLPROPERTIES (\n" +
" 'delta.checkpointInterval' = '3',\n" +
" 'delta.minReaderVersion' = '1',\n" +
" 'delta.minWriterVersion' = '2')\n";
}
if (databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION)) {
return "TBLPROPERTIES (\n" +
" 'Type' = 'EXTERNAL',\n" +
" 'delta.checkpointInterval' = '3',\n" +
" 'delta.minReaderVersion' = '1',\n" +
" 'delta.minWriterVersion' = '2')\n";
}
throw new IllegalArgumentException("Unsupported databricks runtime version: " + databricksRuntimeVersion);
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoCheckpointMinMaxStatisticsForRowType()
Expand Down Expand Up @@ -312,7 +328,7 @@ private void testCheckpointMinMaxStatisticsForRowType(Consumer<String> sqlExecut

// Assert min/max queries can be computed from just metadata
String explainSelectMax = getOnlyElement(onDelta().executeQuery("EXPLAIN SELECT max(root.entry_one) FROM default." + tableName).column(1));
String column = databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_one" : "root.entry_one AS `entry_one`";
String column = databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_one" : "root.entry_one AS `entry_one`";
assertThat(explainSelectMax).matches("== Physical Plan ==\\s*LocalTableScan \\[max\\(" + column + "\\).*]\\s*");

// check both engines can read both tables
Expand Down Expand Up @@ -378,7 +394,7 @@ private void testCheckpointNullStatisticsForRowType(Consumer<String> sqlExecutor

// Assert counting non null entries can be computed from just metadata
String explainCountNotNull = getOnlyElement(onDelta().executeQuery("EXPLAIN SELECT count(root.entry_two) FROM default." + tableName).column(1));
String column = databricksRuntimeVersion.equals(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_two" : "root.entry_two AS `entry_two`";
String column = databricksRuntimeVersion.isAtLeast(DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_two" : "root.entry_two AS `entry_two`";
assertThat(explainCountNotNull).matches("== Physical Plan ==\\s*LocalTableScan \\[count\\(" + column + "\\).*]\\s*");

// check both engines can read both tables
Expand Down Expand Up @@ -509,7 +525,7 @@ private void testWriteStatsAsJsonEnabled(Consumer<String> sqlExecutor, String ta
" delta.checkpoint.writeStatsAsStruct = true)",
tableName, type, bucketName);

if (getDatabricksRuntimeVersion().equals(DATABRICKS_91_RUNTIME_VERSION) && type.equals("struct<x bigint>")) {
if (databricksRuntimeVersion.equals(DATABRICKS_91_RUNTIME_VERSION) && type.equals("struct<x bigint>")) {
assertThatThrownBy(() -> onDelta().executeQuery(createTableSql)).hasStackTraceContaining("ParseException");
throw new SkipException("New runtime version covers the type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertLastEntryIsCheckpointed;
import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion;
Expand Down Expand Up @@ -226,7 +227,8 @@ public void testReplaceTableWithSchemaChange()
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
// Databricks 11.3 doesn't create a checkpoint file at 'CREATE OR REPLACE TABLE' statement
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
Comment on lines 230 to 231
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that doesn't mean we should exclude the test, do it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we can do in 11.3 is verifying a checkpoint file wasn't created in CREATE OR REPLACE TABLE and skip, but I don't feel such logic is helpful in this test method. The test objective is accomplished by other versions. If we verify Databricks behavior (which operation creates a checkpoint file or not), adding dedicated tests look better to me.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can CREATE OR REPLACE TABLE create a checkpoint in 11.3?
Can we force it to do that?

Note that the test specificially wants to test the scenario when checkpoint changes the schema

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I couldn't find a way to create a checkpoint using the statement, such options, alternative procedures or something in 11.3.

@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testReplaceTableWithSchemaChangeOnCheckpoint()
{
Expand Down
Loading