Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ fun interface DestinationCleaner {
* [IntegrationTest.isNamespaceOld] to filter down to namespaces which can be deleted.
*/
fun cleanup()

fun compose(other: DestinationCleaner) = DestinationCleaner {
cleanup()
other.cleanup()
}
}

object NoopDestinationCleaner : DestinationCleaner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.test.fail
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
Expand Down Expand Up @@ -54,6 +55,7 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension
abstract class IntegrationTest(
additionalMicronautEnvs: List<String>,
val dataDumper: DestinationDataDumper,
/** This object MUST be a singleton. It will be invoked exactly once per gradle run. */
val destinationCleaner: DestinationCleaner,
val recordMangler: ExpectedRecordMapper = NoopExpectedRecordMapper,
val nameMapper: NameMapper = NoopNameMapper,
Expand All @@ -67,15 +69,6 @@ abstract class IntegrationTest(
// multiple times.
val destinationProcessFactory = DestinationProcessFactory.get(additionalMicronautEnvs)

// we want to run the cleaner exactly once per test class.
// technically this is a little inefficient - e.g. multiple test classes could reuse the same
// cleaner, so we'd prefer to only call each of them once.
// but this is simpler to implement than tracking hasRunCleaner across test classes,
// and then also requiring cleaners to be able to recognize themselves as identical.
// (you would think this is just an AfterAll method, but junit requires those to be static,
// so we wouldn't have access to the cleaner instance >.>)
private val hasRunCleaner = AtomicBoolean(false)

@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
private val timestampString =
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)
Expand All @@ -102,9 +95,30 @@ abstract class IntegrationTest(

@AfterEach
fun teardown() {
// some tests (e.g. CheckIntegrationTest) hardcode the noop cleaner.
// so just skip all the fancy logic if we detect it.
if (destinationCleaner == NoopDestinationCleaner) {
return
}

if (hasRunCleaner.compareAndSet(false, true)) {
destinationCleaner.cleanup()
}

// Simple guardrail to prevent people from doing the wrong thing,
// since it's not immediately intuitive.
val firstCleaner = cleanerSeen.compareAndSet(null, destinationCleaner)
val sameCleaner = cleanerSeen.compareAndSet(destinationCleaner, destinationCleaner)
if (!(firstCleaner || sameCleaner)) {
throw IllegalStateException(
"""
Multiple DestinationCleaner instances detected. This is not supported. The cleaner MUST be a singleton.
Cleaners detected:
$destinationCleaner
${cleanerSeen.get()}
""".trimIndent()
)
}
}

fun dumpAndDiffRecords(
Expand Down Expand Up @@ -338,6 +352,9 @@ abstract class IntegrationTest(
return namespaceCreationDate.isBefore(cleanupCutoffDate)
}

private val hasRunCleaner = AtomicBoolean(false)
private val cleanerSeen = AtomicReference<DestinationCleaner>(null)

// Connectors are calling System.getenv rather than using micronaut-y properties,
// so we have to mock it out, instead of just setting more properties
// inside NonDockerizedDestination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.StronglyTyped
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.UnknownTypesBehavior
import io.airbyte.integrations.destination.mssql.v2.BulkInsert.Companion.CONFIG_FILE
import io.airbyte.integrations.destination.mssql.v2.config.AzureBlobStorageClientCreator
import io.airbyte.integrations.destination.mssql.v2.config.BulkLoadConfiguration
import io.airbyte.integrations.destination.mssql.v2.config.DataSourceFactory
Expand All @@ -37,7 +38,6 @@ import java.nio.file.Path
import java.sql.Connection
import java.time.Instant
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.UUID
import kotlinx.coroutines.flow.toList
Expand Down Expand Up @@ -141,17 +141,16 @@ class MSSQLDataDumper(private val configProvider: (MSSQLSpecification) -> MSSQLC
}
}

class MSSQLDataCleaner(
private val shouldCleanUp: Boolean,
private val mssqlSpecification: MSSQLSpecification?,
private val configProvider: (MSSQLSpecification) -> MSSQLConfiguration
) : DestinationCleaner {

private val dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd")
object MSSQLDataCleaner : DestinationCleaner {
private val mssqlSpecification =
ValidatedJsonUtils.parseOne(
MSSQLSpecification::class.java,
Files.readString(Path.of(CONFIG_FILE))
)
private val config =
MSSQLConfigurationFactory().makeWithOverrides(mssqlSpecification, emptyMap())

override fun cleanup() {
if (!shouldCleanUp || mssqlSpecification == null) return

// Cleanup azure blobs if in BulkLoad configuration
cleanupAzureBlobsIfNeeded()

Expand All @@ -161,7 +160,7 @@ class MSSQLDataCleaner(

/** Cleans up blobs older than 1 hour if the load configuration is [BulkLoadConfiguration]. */
private fun cleanupAzureBlobsIfNeeded() {
val loadConfig = mssqlSpecification!!.toLoadConfiguration().loadTypeConfiguration
val loadConfig = mssqlSpecification.toLoadConfiguration().loadTypeConfiguration
if (loadConfig !is BulkLoadConfiguration) return

val azureBlobClient = AzureBlobStorageClientCreator.createAzureBlobClient(loadConfig)
Expand Down Expand Up @@ -192,7 +191,6 @@ class MSSQLDataCleaner(
* old (compared to the current local date).
*/
private fun cleanupOldTestSchemas() {
val config = configProvider(mssqlSpecification!!)
val dataSource = DataSourceFactory().dataSource(config)

dataSource.connection.use { conn ->
Expand Down Expand Up @@ -262,14 +260,7 @@ internal class StandardInsert :
val configOverrides = buildOverridesForTestContainer()
MSSQLConfigurationFactory().makeWithOverrides(spec, configOverrides)
},
dataCleaner =
MSSQLDataCleaner(
shouldCleanUp = false,
mssqlSpecification = null,
) { spec ->
val configOverrides = buildOverridesForTestContainer()
MSSQLConfigurationFactory().makeWithOverrides(spec, configOverrides)
},
dataCleaner = MSSQLDataCleaner,
) {

@Test
Expand Down Expand Up @@ -301,15 +292,7 @@ internal class BulkInsert :
MSSQLDataDumper { spec ->
MSSQLConfigurationFactory().makeWithOverrides(spec, emptyMap())
},
dataCleaner =
MSSQLDataCleaner(
shouldCleanUp = true,
mssqlSpecification =
ValidatedJsonUtils.parseOne(
MSSQLSpecification::class.java,
Files.readString(Path.of(CONFIG_FILE))
)
) { spec -> MSSQLConfigurationFactory().makeWithOverrides(spec, emptyMap()) },
dataCleaner = MSSQLDataCleaner,
) {
@Test
override fun testBasicTypes() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake

import io.airbyte.cdk.load.data.icerberg.parquet.IcebergDestinationCleaner
import io.airbyte.cdk.load.test.util.DestinationCleaner

object S3DataLakeCleaner : DestinationCleaner {
private val actualCleaner =
IcebergDestinationCleaner(
S3DataLakeTestUtil.getCatalog(
S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_CONFIG_PATH),
S3DataLakeTestUtil.getAwsAssumeRoleCredentials(),
),
)
.compose(
IcebergDestinationCleaner(
S3DataLakeTestUtil.getCatalog(
S3DataLakeTestUtil.parseConfig(
S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH
),
S3DataLakeTestUtil.getAwsAssumeRoleCredentials(),
),
),
)

override fun cleanup() {
actualCleaner.cleanup()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.icerberg.parquet.IcebergDestinationCleaner
import io.airbyte.cdk.load.data.icerberg.parquet.IcebergWriteTest
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.toolkits.iceberg.parquet.SimpleTableIdGenerator
import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator
Expand All @@ -40,7 +37,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode

abstract class S3DataLakeWriteTest(
configContents: String,
destinationCleaner: DestinationCleaner,
tableIdGenerator: TableIdGenerator,
) :
IcebergWriteTest(
Expand All @@ -52,7 +48,7 @@ abstract class S3DataLakeWriteTest(
S3DataLakeTestUtil.getAwsAssumeRoleCredentials(),
)
},
destinationCleaner,
S3DataLakeCleaner,
tableIdGenerator,
additionalMicronautEnvs = S3DataLakeDestination.additionalMicronautEnvs,
micronautProperties =
Expand All @@ -62,12 +58,6 @@ abstract class S3DataLakeWriteTest(
class GlueWriteTest :
S3DataLakeWriteTest(
Files.readString(S3DataLakeTestUtil.GLUE_CONFIG_PATH),
IcebergDestinationCleaner(
S3DataLakeTestUtil.getCatalog(
S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_CONFIG_PATH),
S3DataLakeTestUtil.getAwsAssumeRoleCredentials(),
)
),
GlueTableIdGenerator(null),
) {
@Test
Expand Down Expand Up @@ -179,22 +169,10 @@ class GlueWriteTest :
class GlueAssumeRoleWriteTest :
S3DataLakeWriteTest(
Files.readString(S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH),
IcebergDestinationCleaner(
S3DataLakeTestUtil.getCatalog(
S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH),
S3DataLakeTestUtil.getAwsAssumeRoleCredentials()
)
),
GlueTableIdGenerator(null),
)

class NessieMinioWriteTest :
S3DataLakeWriteTest(
getConfig(),
// we're writing to ephemeral testcontainers, so no need to clean up after ourselves
NoopDestinationCleaner,
SimpleTableIdGenerator(),
) {
class NessieMinioWriteTest : S3DataLakeWriteTest(getConfig(), SimpleTableIdGenerator()) {

companion object {
private fun getToken(): String {
Expand Down Expand Up @@ -260,8 +238,7 @@ class NessieMinioWriteTest :
// even across multiple streams.
// so run singlethreaded.
@Execution(ExecutionMode.SAME_THREAD)
class RestWriteTest :
S3DataLakeWriteTest(getConfig(), NoopDestinationCleaner, SimpleTableIdGenerator()) {
class RestWriteTest : S3DataLakeWriteTest(getConfig(), SimpleTableIdGenerator()) {

@Test
@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439")
Expand Down
Loading