diff --git a/integration-tests/build.gradle.kts b/integration-tests/build.gradle.kts index 2ef897d13f..5a621a0251 100644 --- a/integration-tests/build.gradle.kts +++ b/integration-tests/build.gradle.kts @@ -51,6 +51,9 @@ dependencies { implementation("org.testcontainers:testcontainers") implementation(libs.s3mock.testcontainers) + implementation(platform(libs.awssdk.bom)) + implementation("software.amazon.awssdk:s3") + implementation("org.apache.iceberg:iceberg-spark-3.5_2.12") implementation("org.apache.iceberg:iceberg-spark-extensions-3.5_2.12") implementation("org.apache.spark:spark-sql_2.12:3.5.6") { diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java index 9ab63aa503..d99089179c 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java @@ -49,6 +49,13 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; @ExtendWith(PolarisIntegrationTestExtension.class) public abstract class PolarisSparkIntegrationTestBase { @@ -64,6 +71,7 @@ public abstract class PolarisSparkIntegrationTestBase { protected String externalCatalogName; protected URI warehouseDir; + protected S3Client s3Client; @BeforeAll public static void setup() throws IOException { @@ -84,6 +92,16 @@ public void before( managementApi = client.managementApi(credentials); catalogApi = client.catalogApi(credentials); + s3Client = + S3Client.builder() + .endpointOverride(URI.create(s3Container.getHttpEndpoint())) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create("accessKey", "secretKey"))) + .region(Region.of("us-west-2")) + .forcePathStyle(true) // Required for S3Mock + .build(); + warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse"); catalogName = client.newEntityName("spark_catalog"); @@ -236,4 +254,15 @@ protected void cleanupCatalog(String catalogName) { protected static Dataset onSpark(@Language("SQL") String sql) { return spark.sql(sql); } + + protected boolean fileExists(String key) { + try { + s3Client.headObject(HeadObjectRequest.builder().bucket("my-bucket").key(key).build()); + return true; // File exists + } catch (NoSuchKeyException e) { + return false; // File does not exist + } catch (S3Exception e) { // Handle other S3-related exceptions + throw e; + } + } } diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java index 2bdd109281..8817ca24d3 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java @@ -133,6 +133,33 @@ public void testCreateAndUpdateExternalTable() { assertThat(rowCount).isEqualTo(4); } + @Test + public void testPurgeTable() { + onSpark("CREATE NAMESPACE ns1"); + onSpark("USE ns1"); + onSpark("CREATE TABLE tb1 (col1 integer, col2 string)"); + onSpark("INSERT INTO tb1 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (5, 'e')"); + + LoadTableResponse tableResponse = loadTable(catalogName, "ns1", "tb1"); + String filePath = tableResponse.metadataLocation().replaceFirst("^s3://my-bucket/", ""); + assertThat(fileExists(filePath)).isTrue(); + + // Drop table with purge + // dropTable(catalogName, "ns1", "tb1", true); + onSpark("DROP TABLE tb1 purge"); + // verify the metadata file is eventually purged + int attempt = 0; + while (fileExists(filePath) && attempt < 5) { + try { + Thread.sleep(1000); + attempt += 1; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + assertThat(fileExists(filePath)).isFalse(); + } + @Test public void testCreateView() { long namespaceCount = onSpark("SHOW NAMESPACES").count();