diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt index 22f335b0abb3..06a57354478c 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt @@ -4,6 +4,16 @@ package io.airbyte.cdk.load.file.avro +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.UnionType import java.io.Closeable import java.io.InputStream import kotlin.io.path.outputStream @@ -34,11 +44,11 @@ class AvroReader( } } -fun InputStream.toAvroReader(avroSchema: Schema): AvroReader { - val reader = GenericDatumReader(avroSchema) +fun InputStream.toAvroReader(descriptor: DestinationStream.Descriptor): AvroReader { + val reader = GenericDatumReader() val tmpFile = kotlin.io.path.createTempFile( - prefix = "${avroSchema.namespace}.${avroSchema.name}", + prefix = "${descriptor.namespace}.${descriptor.name}", suffix = ".avro" ) tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) } @@ -46,3 +56,33 @@ fun InputStream.toAvroReader(avroSchema: Schema): AvroReader { val dataFileReader = DataFileReader(file, reader) return AvroReader(dataFileReader, file) } + +fun toAirbyteType(schema: Schema): AirbyteType { + return when (schema.type) { + Schema.Type.STRING -> StringType + Schema.Type.INT, + Schema.Type.LONG -> IntegerType + Schema.Type.FLOAT, + Schema.Type.DOUBLE -> NumberType + Schema.Type.BOOLEAN -> BooleanType + Schema.Type.RECORD -> + ObjectType( + schema.fields.associateTo(linkedMapOf()) { + it.name() to FieldType(toAirbyteType(it.schema()), nullable = true) + } + ) + Schema.Type.ARRAY -> + ArrayType(FieldType(toAirbyteType(schema.elementType), nullable = true)) + Schema.Type.UNION -> + UnionType( + schema.types + .filter { it.type != Schema.Type.NULL } + .map { toAirbyteType(it) } + .toSet() + ) + Schema.Type.NULL -> + throw IllegalStateException("Null should only appear in union types, and should have been handled in an earlier recursion. This is a bug.") + else -> + throw IllegalArgumentException("Unsupported Avro schema $schema") + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 7ea13f8f6297..a647d3506078 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -15,11 +15,13 @@ import io.airbyte.cdk.load.data.avro.AvroMapperPipelineFactory import io.airbyte.cdk.load.data.avro.toAirbyteValue import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.csv.toAirbyteValue +import io.airbyte.cdk.load.data.json.JsonToAirbyteValue import io.airbyte.cdk.load.data.json.toAirbyteValue import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.file.GZIPProcessor import io.airbyte.cdk.load.file.NoopProcessor +import io.airbyte.cdk.load.file.avro.toAirbyteType import io.airbyte.cdk.load.file.avro.toAvroReader import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory @@ -46,9 +48,6 @@ class ObjectStorageDataDumper( private val formatConfig: ObjectStorageFormatConfiguration, private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null ) { - private val avroMapperPipeline = AvroMapperPipelineFactory().create(stream) - private val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream) - fun dump(): List { val prefix = pathFactory.getFinalDirectory(stream).toString() return runBlocking { @@ -82,9 +81,7 @@ class ObjectStorageDataDumper( .bufferedReader() .lineSequence() .map { line -> - line - .deserializeToNode() - .toAirbyteValue(stream.schema.withAirbyteMeta(wasFlattened)) + JsonToAirbyteValue().fromJson(line.deserializeToNode()) .maybeUnflatten(wasFlattened) .toOutputRecord() } @@ -101,13 +98,12 @@ class ObjectStorageDataDumper( } } is AvroFormatConfiguration -> { - val finalSchema = avroMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened) - inputStream.toAvroReader(finalSchema.toAvroSchema(stream.descriptor)).use { reader + inputStream.toAvroReader(stream.descriptor).use { reader -> reader .recordSequence() .map { - it.toAirbyteValue(finalSchema) + it.toAirbyteValue(toAirbyteType(it.schema)) .maybeUnflatten(wasFlattened) .toOutputRecord() } @@ -115,13 +111,12 @@ class ObjectStorageDataDumper( } } is ParquetFormatConfiguration -> { - val finalSchema = parquetMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened) - inputStream.toParquetReader(finalSchema.toAvroSchema(stream.descriptor)).use { + inputStream.toParquetReader(stream.descriptor).use { reader -> reader .recordSequence() .map { - it.toAirbyteValue(finalSchema) + it.toAirbyteValue(toAirbyteType(it.schema)) .maybeUnflatten(wasFlattened) .toOutputRecord() } diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt index 384a2753bad2..453488f59e85 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt @@ -14,6 +14,7 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.avro.AvroReadSupport import org.apache.parquet.hadoop.ParquetReader as ApacheParquetReader +import io.airbyte.cdk.load.command.DestinationStream class ParquetReader( private val reader: ApacheParquetReader, @@ -31,11 +32,10 @@ class ParquetReader( } } -fun InputStream.toParquetReader(avroSchema: Schema): ParquetReader { - +fun InputStream.toParquetReader(descriptor: DestinationStream.Descriptor): ParquetReader { val tmpFile = kotlin.io.path.createTempFile( - prefix = "${avroSchema.namespace}.${avroSchema.name}", + prefix = "${descriptor.namespace}.${descriptor.name}", suffix = ".avro" ) tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 2147478123f7..ffc2d2b5efbc 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -89,7 +89,12 @@ class S3V2WriteTestCsvUncompressed : promoteUnionToObject = false, preserveUndeclaredFields = true, allTypesBehavior = Untyped, - ) + ) { + @Test + override fun testAppendSchemaEvolution() { + super.testAppendSchemaEvolution() + } +} class S3V2WriteTestCsvRootLevelFlattening : S3V2WriteTest(