From deafd18fa66e834ff2f88e52122daf93787a0acd Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 21 Nov 2024 11:25:16 -0800 Subject: [PATCH] Bulk Load CDK: prefix search nonambiguous; filename uses wall time --- .../MockDestinationConfiguration.kt | 2 +- .../ObjectStoragePathFactory.kt | 22 +++++++++++++++++-- .../ObjectStorageDestinationStateManager.kt | 8 +------ .../io/airbyte/cdk/load/MockPathFactory.kt | 7 ++++++ .../cdk/load/ObjectStorageDataDumper.kt | 8 ++++++- 5 files changed, 36 insertions(+), 11 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationConfiguration.kt index ab14b4741197..df97498a2693 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationConfiguration.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationConfiguration.kt @@ -11,7 +11,7 @@ import io.micronaut.context.annotation.Factory import jakarta.inject.Singleton class MockDestinationConfiguration : DestinationConfiguration() { - // Microbatch for testing + // Micro-batch for testing. override val recordBatchSizeBytes = 1L } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt index 08da74630952..2506790651ce 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt @@ -21,6 +21,7 @@ import java.time.format.DateTimeFormatter import java.util.* interface PathFactory { + fun getLongestStreamConstantPrefix(stream: DestinationStream, isStaging: Boolean): String fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path fun getPathToFile( @@ -193,11 +194,15 @@ class ObjectStoragePathFactory( PathVariable("EPOCH", """\d+""") { it.time.toEpochMilli().toString() }, PathVariable("UUID", """[a-fA-F0-9\\-]{36}""") { UUID.randomUUID().toString() } ) - val PATH_VARIABLES_STREAM_CONSTANT = PATH_VARIABLES.filter { it.variable != "UUID" } + val PATH_VARIABLES_STREAM_CONSTANT = + PATH_VARIABLES.filter { it.variable == "NAMESPACE" || it.variable == "STREAM_NAME" } val FILENAME_VARIABLES = listOf( FileVariable("date", """\d{4}_\d{2}_\d{2}""") { DATE_FORMATTER.format(it.time) }, - FileVariable("timestamp", """\d+""") { it.time.toEpochMilli().toString() }, + FileVariable("timestamp", """\d+""") { + // NOTE: We use a constant time for the path but wall time for the files + Instant.now().toEpochMilli().toString() + }, FileVariable("part_number", """\d+""") { it.partNumber?.toString() ?: throw IllegalArgumentException( @@ -237,6 +242,19 @@ class ObjectStoragePathFactory( return Paths.get(prefix, path) } + override fun getLongestStreamConstantPrefix( + stream: DestinationStream, + isStaging: Boolean + ): String { + return if (isStaging) { + getStagingDirectory(stream, streamConstant = true) + } else { + getFinalDirectory(stream, streamConstant = true) + } + .toString() + .takeWhile { it != '$' } + } + override fun getPathToFile( stream: DestinationStream, partNumber: Long?, diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 73c6542e2566..2736876f9a80 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -163,14 +163,8 @@ class ObjectStorageFallbackPersister( ) : DestinationStatePersister { override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState { val matcher = pathFactory.getPathMatcher(stream) - val pathConstant = pathFactory.getFinalDirectory(stream, streamConstant = true).toString() - val firstVariableIndex = pathConstant.indexOfFirst { it == '$' } val longestUnambiguous = - if (firstVariableIndex > 0) { - pathConstant.substring(0, firstVariableIndex) - } else { - pathConstant - } + pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false) client .list(longestUnambiguous) .mapNotNull { matcher.match(it.key) } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt index ed38e692eacc..7533402cfd1b 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt @@ -43,6 +43,13 @@ open class MockPathFactory : PathFactory { return prefix.resolve("file") } + override fun getLongestStreamConstantPrefix( + stream: DestinationStream, + isStaging: Boolean + ): String { + TODO("Not yet implemented") + } + override fun getPathMatcher(stream: DestinationStream): PathMatcher { return PathMatcher( regex = Regex("$prefix/(.*)-(.*)$"), diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 7ea13f8f6297..56061decaf80 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -32,6 +32,7 @@ import io.airbyte.cdk.load.util.deserializeToNode import java.io.InputStream import java.util.zip.GZIPInputStream import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking @@ -50,11 +51,16 @@ class ObjectStorageDataDumper( private val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream) fun dump(): List { - val prefix = pathFactory.getFinalDirectory(stream).toString() + // Note: this is implicitly a test of the `streamConstant` final directory + // and the path matcher, so a failure here might imply a bug in the metadata-based + // destination state loader, which lists by `prefix` and filters against the matcher. + val prefix = pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false) + val matcher = pathFactory.getPathMatcher(stream) return runBlocking { withContext(Dispatchers.IO) { client .list(prefix) + .filter { matcher.match(it.key) != null } .map { listedObject: RemoteObject<*> -> client.get(listedObject.key) { objectData: InputStream -> val decompressed =