Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: prefix search nonambiguous; filename uses wall time #48623

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

class MockDestinationConfiguration : DestinationConfiguration() {
// Microbatch for testing
// Micro-batch for testing.
override val recordBatchSizeBytes = 1L
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.format.DateTimeFormatter
import java.util.*

interface PathFactory {
fun getLongestStreamConstantPrefix(stream: DestinationStream, isStaging: Boolean): String
fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path
fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path
fun getPathToFile(
Expand Down Expand Up @@ -193,11 +194,15 @@ class ObjectStoragePathFactory(
PathVariable("EPOCH", """\d+""") { it.time.toEpochMilli().toString() },
PathVariable("UUID", """[a-fA-F0-9\\-]{36}""") { UUID.randomUUID().toString() }
)
val PATH_VARIABLES_STREAM_CONSTANT = PATH_VARIABLES.filter { it.variable != "UUID" }
val PATH_VARIABLES_STREAM_CONSTANT =
PATH_VARIABLES.filter { it.variable == "NAMESPACE" || it.variable == "STREAM_NAME" }
val FILENAME_VARIABLES =
listOf(
FileVariable("date", """\d{4}_\d{2}_\d{2}""") { DATE_FORMATTER.format(it.time) },
FileVariable("timestamp", """\d+""") { it.time.toEpochMilli().toString() },
FileVariable("timestamp", """\d+""") {
// NOTE: We use a constant time for the path but wall time for the files
Instant.now().toEpochMilli().toString()
},
Copy link
Contributor

@tryangul tryangul Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is there a way to use the DI'd clock here? Also, this being in a companion object, when does it actually get called?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's init, maybe we can initialize it in an init block and reference it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DI'd clock is "sync time". It's used once and the same value is constant across all uploads, and is used for all the time-based variables except this one, which is supposed to be the actual load time.

FileVariable("part_number", """\d+""") {
it.partNumber?.toString()
?: throw IllegalArgumentException(
Expand Down Expand Up @@ -237,6 +242,19 @@ class ObjectStoragePathFactory(
return Paths.get(prefix, path)
}

override fun getLongestStreamConstantPrefix(
stream: DestinationStream,
isStaging: Boolean
): String {
return if (isStaging) {
getStagingDirectory(stream, streamConstant = true)
} else {
getFinalDirectory(stream, streamConstant = true)
}
.toString()
.takeWhile { it != '$' }
}

override fun getPathToFile(
stream: DestinationStream,
partNumber: Long?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,8 @@ class ObjectStorageFallbackPersister(
) : DestinationStatePersister<ObjectStorageDestinationState> {
override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState {
val matcher = pathFactory.getPathMatcher(stream)
val pathConstant = pathFactory.getFinalDirectory(stream, streamConstant = true).toString()
val firstVariableIndex = pathConstant.indexOfFirst { it == '$' }
val longestUnambiguous =
if (firstVariableIndex > 0) {
pathConstant.substring(0, firstVariableIndex)
} else {
pathConstant
}
pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false)
client
.list(longestUnambiguous)
.mapNotNull { matcher.match(it.key) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ open class MockPathFactory : PathFactory {
return prefix.resolve("file")
}

override fun getLongestStreamConstantPrefix(
stream: DestinationStream,
isStaging: Boolean
): String {
TODO("Not yet implemented")
}

override fun getPathMatcher(stream: DestinationStream): PathMatcher {
return PathMatcher(
regex = Regex("$prefix/(.*)-(.*)$"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.airbyte.cdk.load.util.deserializeToNode
import java.io.InputStream
import java.util.zip.GZIPInputStream
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
Expand All @@ -50,11 +51,16 @@ class ObjectStorageDataDumper(
private val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream)

fun dump(): List<OutputRecord> {
val prefix = pathFactory.getFinalDirectory(stream).toString()
// Note: this is implicitly a test of the `streamConstant` final directory
// and the path matcher, so a failure here might imply a bug in the metadata-based
// destination state loader, which lists by `prefix` and filters against the matcher.
val prefix = pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false)
val matcher = pathFactory.getPathMatcher(stream)
return runBlocking {
withContext(Dispatchers.IO) {
client
.list(prefix)
.filter { matcher.match(it.key) != null }
.map { listedObject: RemoteObject<*> ->
client.get(listedObject.key) { objectData: InputStream ->
val decompressed =
Expand Down
Loading