Skip to content

Commit

Permalink
Bulk Load CDK: prefix search nonambiguous; filename uses wall time (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Nov 23, 2024
1 parent 8c05dc8 commit b65a21f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

class MockDestinationConfiguration : DestinationConfiguration() {
// Microbatch for testing
// Micro-batch for testing.
override val recordBatchSizeBytes = 1L
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.format.DateTimeFormatter
import java.util.*

interface PathFactory {
fun getLongestStreamConstantPrefix(stream: DestinationStream, isStaging: Boolean): String
fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path
fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path
fun getPathToFile(
Expand Down Expand Up @@ -193,11 +194,15 @@ class ObjectStoragePathFactory(
PathVariable("EPOCH", """\d+""") { it.time.toEpochMilli().toString() },
PathVariable("UUID", """[a-fA-F0-9\\-]{36}""") { UUID.randomUUID().toString() }
)
val PATH_VARIABLES_STREAM_CONSTANT = PATH_VARIABLES.filter { it.variable != "UUID" }
val PATH_VARIABLES_STREAM_CONSTANT =
PATH_VARIABLES.filter { it.variable == "NAMESPACE" || it.variable == "STREAM_NAME" }
val FILENAME_VARIABLES =
listOf(
FileVariable("date", """\d{4}_\d{2}_\d{2}""") { DATE_FORMATTER.format(it.time) },
FileVariable("timestamp", """\d+""") { it.time.toEpochMilli().toString() },
FileVariable("timestamp", """\d+""") {
// NOTE: We use a constant time for the path but wall time for the files
Instant.now().toEpochMilli().toString()
},
FileVariable("part_number", """\d+""") {
it.partNumber?.toString()
?: throw IllegalArgumentException(
Expand Down Expand Up @@ -237,6 +242,19 @@ class ObjectStoragePathFactory(
return Paths.get(prefix, path)
}

override fun getLongestStreamConstantPrefix(
stream: DestinationStream,
isStaging: Boolean
): String {
return if (isStaging) {
getStagingDirectory(stream, streamConstant = true)
} else {
getFinalDirectory(stream, streamConstant = true)
}
.toString()
.takeWhile { it != '$' }
}

override fun getPathToFile(
stream: DestinationStream,
partNumber: Long?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,8 @@ class ObjectStorageFallbackPersister(
) : DestinationStatePersister<ObjectStorageDestinationState> {
override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState {
val matcher = pathFactory.getPathMatcher(stream)
val pathConstant = pathFactory.getFinalDirectory(stream, streamConstant = true).toString()
val firstVariableIndex = pathConstant.indexOfFirst { it == '$' }
val longestUnambiguous =
if (firstVariableIndex > 0) {
pathConstant.substring(0, firstVariableIndex)
} else {
pathConstant
}
pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false)
client
.list(longestUnambiguous)
.mapNotNull { matcher.match(it.key) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ open class MockPathFactory : PathFactory {
return prefix.resolve("file")
}

override fun getLongestStreamConstantPrefix(
stream: DestinationStream,
isStaging: Boolean
): String {
TODO("Not yet implemented")
}

override fun getPathMatcher(stream: DestinationStream): PathMatcher {
return PathMatcher(
regex = Regex("$prefix/(.*)-(.*)$"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.airbyte.cdk.load.util.deserializeToNode
import java.io.InputStream
import java.util.zip.GZIPInputStream
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
Expand All @@ -50,11 +51,16 @@ class ObjectStorageDataDumper(
private val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream)

fun dump(): List<OutputRecord> {
val prefix = pathFactory.getFinalDirectory(stream).toString()
// Note: this is implicitly a test of the `streamConstant` final directory
// and the path matcher, so a failure here might imply a bug in the metadata-based
// destination state loader, which lists by `prefix` and filters against the matcher.
val prefix = pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false)
val matcher = pathFactory.getPathMatcher(stream)
return runBlocking {
withContext(Dispatchers.IO) {
client
.list(prefix)
.filter { matcher.match(it.key) != null }
.map { listedObject: RemoteObject<*> ->
client.get(listedObject.key) { objectData: InputStream ->
val decompressed =
Expand Down

0 comments on commit b65a21f

Please sign in to comment.