diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt index 84375e28ea31..78a97b376946 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt @@ -14,6 +14,11 @@ fun interface DestinationCleaner { * [IntegrationTest.isNamespaceOld] to filter down to namespaces which can be deleted. */ fun cleanup() + + fun compose(other: DestinationCleaner) = DestinationCleaner { + cleanup() + other.cleanup() + } } object NoopDestinationCleaner : DestinationCleaner { diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index 1a5030340778..2703920bc158 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -27,6 +27,7 @@ import java.time.LocalDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference import kotlin.test.fail import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async @@ -54,6 +55,7 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension abstract class IntegrationTest( additionalMicronautEnvs: List, val dataDumper: DestinationDataDumper, + /** This object MUST be a singleton. It will be invoked exactly once per gradle run. */ val destinationCleaner: DestinationCleaner, val recordMangler: ExpectedRecordMapper = NoopExpectedRecordMapper, val nameMapper: NameMapper = NoopNameMapper, @@ -67,15 +69,6 @@ abstract class IntegrationTest( // multiple times. val destinationProcessFactory = DestinationProcessFactory.get(additionalMicronautEnvs) - // we want to run the cleaner exactly once per test class. - // technically this is a little inefficient - e.g. multiple test classes could reuse the same - // cleaner, so we'd prefer to only call each of them once. - // but this is simpler to implement than tracking hasRunCleaner across test classes, - // and then also requiring cleaners to be able to recognize themselves as identical. - // (you would think this is just an AfterAll method, but junit requires those to be static, - // so we wouldn't have access to the cleaner instance >.>) - private val hasRunCleaner = AtomicBoolean(false) - @Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4) private val timestampString = LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC) @@ -102,9 +95,30 @@ abstract class IntegrationTest( @AfterEach fun teardown() { + // some tests (e.g. CheckIntegrationTest) hardcode the noop cleaner. + // so just skip all the fancy logic if we detect it. + if (destinationCleaner == NoopDestinationCleaner) { + return + } + if (hasRunCleaner.compareAndSet(false, true)) { destinationCleaner.cleanup() } + + // Simple guardrail to prevent people from doing the wrong thing, + // since it's not immediately intuitive. + val firstCleaner = cleanerSeen.compareAndSet(null, destinationCleaner) + val sameCleaner = cleanerSeen.compareAndSet(destinationCleaner, destinationCleaner) + if (!(firstCleaner || sameCleaner)) { + throw IllegalStateException( + """ + Multiple DestinationCleaner instances detected. This is not supported. The cleaner MUST be a singleton. + Cleaners detected: + $destinationCleaner + ${cleanerSeen.get()} + """.trimIndent() + ) + } } fun dumpAndDiffRecords( @@ -338,6 +352,9 @@ abstract class IntegrationTest( return namespaceCreationDate.isBefore(cleanupCutoffDate) } + private val hasRunCleaner = AtomicBoolean(false) + private val cleanerSeen = AtomicReference(null) + // Connectors are calling System.getenv rather than using micronaut-y properties, // so we have to mock it out, instead of just setting more properties // inside NonDockerizedDestination. diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt b/airbyte-integrations/connectors/destination-mssql/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt index ff66b2a128e1..118e86302528 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt @@ -24,6 +24,7 @@ import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior import io.airbyte.cdk.load.write.StronglyTyped import io.airbyte.cdk.load.write.UnionBehavior import io.airbyte.cdk.load.write.UnknownTypesBehavior +import io.airbyte.integrations.destination.mssql.v2.BulkInsert.Companion.CONFIG_FILE import io.airbyte.integrations.destination.mssql.v2.config.AzureBlobStorageClientCreator import io.airbyte.integrations.destination.mssql.v2.config.BulkLoadConfiguration import io.airbyte.integrations.destination.mssql.v2.config.DataSourceFactory @@ -37,7 +38,6 @@ import java.nio.file.Path import java.sql.Connection import java.time.Instant import java.time.OffsetDateTime -import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.UUID import kotlinx.coroutines.flow.toList @@ -141,17 +141,16 @@ class MSSQLDataDumper(private val configProvider: (MSSQLSpecification) -> MSSQLC } } -class MSSQLDataCleaner( - private val shouldCleanUp: Boolean, - private val mssqlSpecification: MSSQLSpecification?, - private val configProvider: (MSSQLSpecification) -> MSSQLConfiguration -) : DestinationCleaner { - - private val dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd") +object MSSQLDataCleaner : DestinationCleaner { + private val mssqlSpecification = + ValidatedJsonUtils.parseOne( + MSSQLSpecification::class.java, + Files.readString(Path.of(CONFIG_FILE)) + ) + private val config = + MSSQLConfigurationFactory().makeWithOverrides(mssqlSpecification, emptyMap()) override fun cleanup() { - if (!shouldCleanUp || mssqlSpecification == null) return - // Cleanup azure blobs if in BulkLoad configuration cleanupAzureBlobsIfNeeded() @@ -161,7 +160,7 @@ class MSSQLDataCleaner( /** Cleans up blobs older than 1 hour if the load configuration is [BulkLoadConfiguration]. */ private fun cleanupAzureBlobsIfNeeded() { - val loadConfig = mssqlSpecification!!.toLoadConfiguration().loadTypeConfiguration + val loadConfig = mssqlSpecification.toLoadConfiguration().loadTypeConfiguration if (loadConfig !is BulkLoadConfiguration) return val azureBlobClient = AzureBlobStorageClientCreator.createAzureBlobClient(loadConfig) @@ -192,7 +191,6 @@ class MSSQLDataCleaner( * old (compared to the current local date). */ private fun cleanupOldTestSchemas() { - val config = configProvider(mssqlSpecification!!) val dataSource = DataSourceFactory().dataSource(config) dataSource.connection.use { conn -> @@ -262,14 +260,7 @@ internal class StandardInsert : val configOverrides = buildOverridesForTestContainer() MSSQLConfigurationFactory().makeWithOverrides(spec, configOverrides) }, - dataCleaner = - MSSQLDataCleaner( - shouldCleanUp = false, - mssqlSpecification = null, - ) { spec -> - val configOverrides = buildOverridesForTestContainer() - MSSQLConfigurationFactory().makeWithOverrides(spec, configOverrides) - }, + dataCleaner = MSSQLDataCleaner, ) { @Test @@ -301,15 +292,7 @@ internal class BulkInsert : MSSQLDataDumper { spec -> MSSQLConfigurationFactory().makeWithOverrides(spec, emptyMap()) }, - dataCleaner = - MSSQLDataCleaner( - shouldCleanUp = true, - mssqlSpecification = - ValidatedJsonUtils.parseOne( - MSSQLSpecification::class.java, - Files.readString(Path.of(CONFIG_FILE)) - ) - ) { spec -> MSSQLConfigurationFactory().makeWithOverrides(spec, emptyMap()) }, + dataCleaner = MSSQLDataCleaner, ) { @Test override fun testBasicTypes() { diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeCleaner.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeCleaner.kt new file mode 100644 index 000000000000..52067be1d9ea --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeCleaner.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_data_lake + +import io.airbyte.cdk.load.data.icerberg.parquet.IcebergDestinationCleaner +import io.airbyte.cdk.load.test.util.DestinationCleaner + +object S3DataLakeCleaner : DestinationCleaner { + private val actualCleaner = + IcebergDestinationCleaner( + S3DataLakeTestUtil.getCatalog( + S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_CONFIG_PATH), + S3DataLakeTestUtil.getAwsAssumeRoleCredentials(), + ), + ) + .compose( + IcebergDestinationCleaner( + S3DataLakeTestUtil.getCatalog( + S3DataLakeTestUtil.parseConfig( + S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH + ), + S3DataLakeTestUtil.getAwsAssumeRoleCredentials(), + ), + ), + ) + + override fun cleanup() { + actualCleaner.cleanup() + } +} diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt index 15d2dc43a366..1e60992c57d6 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt @@ -14,12 +14,9 @@ import io.airbyte.cdk.load.data.ArrayType import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.ObjectType -import io.airbyte.cdk.load.data.icerberg.parquet.IcebergDestinationCleaner import io.airbyte.cdk.load.data.icerberg.parquet.IcebergWriteTest import io.airbyte.cdk.load.message.InputRecord import io.airbyte.cdk.load.message.Meta -import io.airbyte.cdk.load.test.util.DestinationCleaner -import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.toolkits.iceberg.parquet.SimpleTableIdGenerator import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator @@ -40,7 +37,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode abstract class S3DataLakeWriteTest( configContents: String, - destinationCleaner: DestinationCleaner, tableIdGenerator: TableIdGenerator, ) : IcebergWriteTest( @@ -52,7 +48,7 @@ abstract class S3DataLakeWriteTest( S3DataLakeTestUtil.getAwsAssumeRoleCredentials(), ) }, - destinationCleaner, + S3DataLakeCleaner, tableIdGenerator, additionalMicronautEnvs = S3DataLakeDestination.additionalMicronautEnvs, micronautProperties = @@ -62,12 +58,6 @@ abstract class S3DataLakeWriteTest( class GlueWriteTest : S3DataLakeWriteTest( Files.readString(S3DataLakeTestUtil.GLUE_CONFIG_PATH), - IcebergDestinationCleaner( - S3DataLakeTestUtil.getCatalog( - S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_CONFIG_PATH), - S3DataLakeTestUtil.getAwsAssumeRoleCredentials(), - ) - ), GlueTableIdGenerator(null), ) { @Test @@ -179,22 +169,10 @@ class GlueWriteTest : class GlueAssumeRoleWriteTest : S3DataLakeWriteTest( Files.readString(S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), - IcebergDestinationCleaner( - S3DataLakeTestUtil.getCatalog( - S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), - S3DataLakeTestUtil.getAwsAssumeRoleCredentials() - ) - ), GlueTableIdGenerator(null), ) -class NessieMinioWriteTest : - S3DataLakeWriteTest( - getConfig(), - // we're writing to ephemeral testcontainers, so no need to clean up after ourselves - NoopDestinationCleaner, - SimpleTableIdGenerator(), - ) { +class NessieMinioWriteTest : S3DataLakeWriteTest(getConfig(), SimpleTableIdGenerator()) { companion object { private fun getToken(): String { @@ -260,8 +238,7 @@ class NessieMinioWriteTest : // even across multiple streams. // so run singlethreaded. @Execution(ExecutionMode.SAME_THREAD) -class RestWriteTest : - S3DataLakeWriteTest(getConfig(), NoopDestinationCleaner, SimpleTableIdGenerator()) { +class RestWriteTest : S3DataLakeWriteTest(getConfig(), SimpleTableIdGenerator()) { @Test @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439")