diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java index 8dee06830e5e..c68af9192544 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java @@ -13,13 +13,19 @@ */ package io.trino.tests.product.deltalake; +import com.amazonaws.services.s3.AmazonS3; +import com.google.inject.Inject; +import com.google.inject.name.Named; +import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.assertions.QueryAssert; +import org.assertj.core.api.Assertions; import org.assertj.core.api.SoftAssertions; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.Collection; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Stream; import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; @@ -38,6 +44,19 @@ public class TestDeltaLakeWriteDatabricksCompatibility extends BaseTestDeltaLakeS3Storage { + @Inject + @Named("s3.server_type") + private String s3ServerType; + + private AmazonS3 s3; + + @BeforeTestWithContext + public void setup() + { + super.setUp(); + s3 = new S3ClientFactory().createS3Client(s3ServerType); + } + @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) public void testUpdateCompatibility() { @@ -286,6 +305,53 @@ public void testInsertingIntoDatabricksTableWithAddedNotNullConstraint() } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + public void testTrinoVacuumRemoveChangeDataFeedFiles() + { + testVacuumRemoveChangeDataFeedFiles(tableName -> { + onTrino().executeQuery("SET SESSION delta.vacuum_min_retention = '0s'"); + onTrino().executeQuery("CALL delta.system.vacuum('default', '" + tableName + "', '0s')"); + }); + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + public void testDatabricksVacuumRemoveChangeDataFeedFiles() + { + testVacuumRemoveChangeDataFeedFiles(tableName -> { + onDelta().executeQuery("SET spark.databricks.delta.retentionDurationCheck.enabled = false"); + onDelta().executeQuery("VACUUM default." + tableName + " RETAIN 0 HOURS"); + }); + } + + private void testVacuumRemoveChangeDataFeedFiles(Consumer vacuumExecutor) + { + String tableName = "test_vacuum_ignore_cdf_" + randomTableSuffix(); + String directoryName = "databricks-compatibility-test-" + tableName; + String changeDataPrefix = directoryName + "/_change_data"; + + onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT) " + + "USING DELTA " + + "LOCATION '" + ("s3://" + bucketName + "/" + directoryName) + "'" + + "TBLPROPERTIES (delta.enableChangeDataFeed = true)"); + + try { + // Executing some statements to create _change_data directory + onDelta().executeQuery("INSERT INTO " + tableName + " VALUES (1)"); + onDelta().executeQuery("UPDATE " + tableName + " SET a = 2"); + + Assertions.assertThat(s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries()).hasSize(1); + + // Vacuum procedure should remove files in _change_data directory + // https://docs.delta.io/2.1.0/delta-change-data-feed.html#change-data-storage + vacuumExecutor.accept(tableName); + + Assertions.assertThat(s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries()).hasSize(0); + } + finally { + onDelta().executeQuery("DROP TABLE default." + tableName); + } + } + @DataProvider(name = "partition_column_names") public static Object[][] partitionColumns() {