Skip to content

Commit

Permalink
read actual schema, not expected schema
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Nov 15, 2024
1 parent a0c9f83 commit 645501b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

package io.airbyte.cdk.load.file.avro

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.UnionType
import java.io.Closeable
import java.io.InputStream
import kotlin.io.path.outputStream
Expand Down Expand Up @@ -34,15 +44,45 @@ class AvroReader(
}
}

fun InputStream.toAvroReader(avroSchema: Schema): AvroReader {
val reader = GenericDatumReader<GenericRecord>(avroSchema)
fun InputStream.toAvroReader(descriptor: DestinationStream.Descriptor): AvroReader {
val reader = GenericDatumReader<GenericRecord>()
val tmpFile =
kotlin.io.path.createTempFile(
prefix = "${avroSchema.namespace}.${avroSchema.name}",
prefix = "${descriptor.namespace}.${descriptor.name}",
suffix = ".avro"
)
tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) }
val file = tmpFile.toFile()
val dataFileReader = DataFileReader(file, reader)
return AvroReader(dataFileReader, file)
}

fun toAirbyteType(schema: Schema): AirbyteType {
return when (schema.type) {
Schema.Type.STRING -> StringType
Schema.Type.INT,
Schema.Type.LONG -> IntegerType
Schema.Type.FLOAT,
Schema.Type.DOUBLE -> NumberType
Schema.Type.BOOLEAN -> BooleanType
Schema.Type.RECORD ->
ObjectType(
schema.fields.associateTo(linkedMapOf()) {
it.name() to FieldType(toAirbyteType(it.schema()), nullable = true)
}
)
Schema.Type.ARRAY ->
ArrayType(FieldType(toAirbyteType(schema.elementType), nullable = true))
Schema.Type.UNION ->
UnionType(
schema.types
.filter { it.type != Schema.Type.NULL }
.map { toAirbyteType(it) }
.toSet()
)
Schema.Type.NULL ->
throw IllegalStateException("Null should only appear in union types, and should have been handled in an earlier recursion. This is a bug.")
else ->
throw IllegalArgumentException("Unsupported Avro schema $schema")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import io.airbyte.cdk.load.data.avro.AvroMapperPipelineFactory
import io.airbyte.cdk.load.data.avro.toAirbyteValue
import io.airbyte.cdk.load.data.avro.toAvroSchema
import io.airbyte.cdk.load.data.csv.toAirbyteValue
import io.airbyte.cdk.load.data.json.JsonToAirbyteValue
import io.airbyte.cdk.load.data.json.toAirbyteValue
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.file.GZIPProcessor
import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.avro.toAirbyteType
import io.airbyte.cdk.load.file.avro.toAvroReader
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
Expand All @@ -46,9 +48,6 @@ class ObjectStorageDataDumper(
private val formatConfig: ObjectStorageFormatConfiguration,
private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null
) {
private val avroMapperPipeline = AvroMapperPipelineFactory().create(stream)
private val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream)

fun dump(): List<OutputRecord> {
val prefix = pathFactory.getFinalDirectory(stream).toString()
return runBlocking {
Expand Down Expand Up @@ -82,9 +81,7 @@ class ObjectStorageDataDumper(
.bufferedReader()
.lineSequence()
.map { line ->
line
.deserializeToNode()
.toAirbyteValue(stream.schema.withAirbyteMeta(wasFlattened))
JsonToAirbyteValue().fromJson(line.deserializeToNode())
.maybeUnflatten(wasFlattened)
.toOutputRecord()
}
Expand All @@ -101,27 +98,25 @@ class ObjectStorageDataDumper(
}
}
is AvroFormatConfiguration -> {
val finalSchema = avroMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened)
inputStream.toAvroReader(finalSchema.toAvroSchema(stream.descriptor)).use { reader
inputStream.toAvroReader(stream.descriptor).use { reader
->
reader
.recordSequence()
.map {
it.toAirbyteValue(finalSchema)
it.toAirbyteValue(toAirbyteType(it.schema))
.maybeUnflatten(wasFlattened)
.toOutputRecord()
}
.toList()
}
}
is ParquetFormatConfiguration -> {
val finalSchema = parquetMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened)
inputStream.toParquetReader(finalSchema.toAvroSchema(stream.descriptor)).use {
inputStream.toParquetReader(stream.descriptor).use {
reader ->
reader
.recordSequence()
.map {
it.toAirbyteValue(finalSchema)
it.toAirbyteValue(toAirbyteType(it.schema))
.maybeUnflatten(wasFlattened)
.toOutputRecord()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.avro.AvroReadSupport
import org.apache.parquet.hadoop.ParquetReader as ApacheParquetReader
import io.airbyte.cdk.load.command.DestinationStream

class ParquetReader(
private val reader: ApacheParquetReader<GenericRecord>,
Expand All @@ -31,11 +32,10 @@ class ParquetReader(
}
}

fun InputStream.toParquetReader(avroSchema: Schema): ParquetReader {

fun InputStream.toParquetReader(descriptor: DestinationStream.Descriptor): ParquetReader {
val tmpFile =
kotlin.io.path.createTempFile(
prefix = "${avroSchema.namespace}.${avroSchema.name}",
prefix = "${descriptor.namespace}.${descriptor.name}",
suffix = ".avro"
)
tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ class S3V2WriteTestCsvUncompressed :
promoteUnionToObject = false,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
)
) {
@Test
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
}
}

class S3V2WriteTestCsvRootLevelFlattening :
S3V2WriteTest(
Expand Down

0 comments on commit 645501b

Please sign in to comment.