Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@
*/
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.BeforeTestWithContext;
import io.trino.tempto.assertions.QueryAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
Expand All @@ -38,6 +44,19 @@
public class TestDeltaLakeWriteDatabricksCompatibility
extends BaseTestDeltaLakeS3Storage
{
@Inject
@Named("s3.server_type")
private String s3ServerType;

private AmazonS3 s3;

@BeforeTestWithContext
public void setup()
{
super.setUp();
s3 = new S3ClientFactory().createS3Client(s3ServerType);
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testUpdateCompatibility()
{
Expand Down Expand Up @@ -286,6 +305,53 @@ public void testInsertingIntoDatabricksTableWithAddedNotNullConstraint()
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testTrinoVacuumRemoveChangeDataFeedFiles()
{
testVacuumRemoveChangeDataFeedFiles(tableName -> {
onTrino().executeQuery("SET SESSION delta.vacuum_min_retention = '0s'");
onTrino().executeQuery("CALL delta.system.vacuum('default', '" + tableName + "', '0s')");
});
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testDatabricksVacuumRemoveChangeDataFeedFiles()
{
testVacuumRemoveChangeDataFeedFiles(tableName -> {
onDelta().executeQuery("SET spark.databricks.delta.retentionDurationCheck.enabled = false");
onDelta().executeQuery("VACUUM default." + tableName + " RETAIN 0 HOURS");
});
}

private void testVacuumRemoveChangeDataFeedFiles(Consumer<String> vacuumExecutor)
{
String tableName = "test_vacuum_ignore_cdf_" + randomTableSuffix();
String directoryName = "databricks-compatibility-test-" + tableName;
String changeDataPrefix = directoryName + "/_change_data";

onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT) " +
"USING DELTA " +
"LOCATION '" + ("s3://" + bucketName + "/" + directoryName) + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

try {
// Executing some statements to create _change_data directory
onDelta().executeQuery("INSERT INTO " + tableName + " VALUES (1)");
onDelta().executeQuery("UPDATE " + tableName + " SET a = 2");

Assertions.assertThat(s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries()).hasSize(1);

// Vacuum procedure should remove files in _change_data directory
// https://docs.delta.io/2.1.0/delta-change-data-feed.html#change-data-storage
vacuumExecutor.accept(tableName);

Assertions.assertThat(s3.listObjectsV2(bucketName, changeDataPrefix).getObjectSummaries()).hasSize(0);
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@DataProvider(name = "partition_column_names")
public static Object[][] partitionColumns()
{
Expand Down