From 6434a1ee60fc20eb2de8c34c70d75ac76080d192 Mon Sep 17 00:00:00 2001 From: Limian Zhang <75445426+rzhang10@users.noreply.github.com> Date: Fri, 9 Sep 2022 15:07:17 -0700 Subject: [PATCH 1/3] Spark: Demonstrate reading Avro files with default value using a mock defaultMap --- .../iceberg/avro/BuildAvroProjection.java | 5 + .../iceberg/avro/SingleValueToAvroValue.java | 91 ++++++++++ .../iceberg/spark/source/BaseReader.java | 22 ++- .../iceberg/spark/data/TestHelpers.java | 19 +- .../data/TestSparkAvroReadDefaultValue.java | 169 ++++++++++++++++++ 5 files changed, 301 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/SingleValueToAvroValue.java create mode 100644 spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReadDefaultValue.java diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 3f1a71a9e6c2..28409477d55c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -101,6 +101,11 @@ public Schema record(Schema record, List names, Iterable s updatedFields.add(avroField); } else { + // TODO when we later add the API support for default values, the below condition: + // (field.isRequired() && field.initialDefaultValue() != null) + // can also pass the validation, and we simply don't need to add the field to the avro file + // read + // schema, since the default value read is handled at Iceberg level by constantMap Preconditions.checkArgument( field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), "Missing required field: %s", diff --git a/core/src/main/java/org/apache/iceberg/avro/SingleValueToAvroValue.java b/core/src/main/java/org/apache/iceberg/avro/SingleValueToAvroValue.java new file mode 100644 index 000000000000..29fb77c531c8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/SingleValueToAvroValue.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class SingleValueToAvroValue { + + private SingleValueToAvroValue() {} + + public static Object convert(Type type, Object value) { + switch (type.typeId()) { + case STRUCT: + Types.StructType structType = type.asNestedType().asStructType(); + StructLike structValue = (StructLike) value; + GenericData.Record rec = new GenericData.Record(AvroSchemaUtil.convert(type)); + + for (int i = 0; i < structValue.size(); i += 1) { + Type fieldType = structType.fields().get(i).type(); + rec.put(i, convert(fieldType, structValue.get(i, fieldType.typeId().javaClass()))); + } + + return rec; + + case LIST: + Types.ListType listType = type.asNestedType().asListType(); + Type elementType = listType.elementType(); + List listValue = (List) value; + + List list = Lists.newArrayListWithExpectedSize(listValue.size()); + for (Object o : listValue) { + list.add(convert(elementType, o)); + } + + return list; + + case MAP: + Types.MapType mapType = type.asNestedType().asMapType(); + Map mapValue = (Map) value; + + Map map = Maps.newLinkedHashMap(); + for (Map.Entry entry : mapValue.entrySet()) { + map.put( + convert(mapType.keyType(), entry.getKey()), + convert(mapType.valueType(), entry.getValue())); + } + + return map; + + default: + return convertPrimitive(type.asPrimitiveType(), value); + } + } + + private static Object convertPrimitive(Type.PrimitiveType primitive, Object value) { + // For the primitives that Avro needs a different type than Spark, fix + // them here. + switch (primitive.typeId()) { + case FIXED: + return new GenericData.Fixed( + AvroSchemaUtil.convert(primitive), ByteBuffers.toByteArray((ByteBuffer) value)); + default: + return value; + } + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 2333cd734bbe..1eb662e8beea 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -49,6 +49,7 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; @@ -59,6 +60,8 @@ import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; @@ -69,7 +72,7 @@ * * @param is the Java class returned by this reader whose objects contain one or more rows. */ -abstract class BaseReader implements Closeable { +public abstract class BaseReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class); private final Table table; @@ -201,7 +204,7 @@ private EncryptedInputFile toEncryptedInputFile(ContentFile file) { } } - protected static Object convertConstant(Type type, Object value) { + public static Object convertConstant(Type type, Object value) { if (value == null) { return null; } @@ -224,6 +227,21 @@ protected static Object convertConstant(Type type, Object value) { return ByteBuffers.toByteArray((ByteBuffer) value); case BINARY: return ByteBuffers.toByteArray((ByteBuffer) value); + case UUID: + return UTF8String.fromString(value.toString()); + case LIST: + return new GenericArrayData( + ((List) value) + .stream().map(e -> convertConstant(type.asListType().elementType(), e)).toArray()); + case MAP: + List keyList = Lists.newArrayList(); + List valueList = Lists.newArrayList(); + for (Map.Entry entry : ((Map) value).entrySet()) { + keyList.add(convertConstant(type.asMapType().keyType(), entry.getKey())); + valueList.add(convertConstant(type.asMapType().valueType(), entry.getValue())); + } + return new ArrayBasedMapData( + new GenericArrayData(keyList.toArray()), new GenericArrayData(valueList.toArray())); case STRUCT: StructType structType = (StructType) type; diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 69b14eead4d5..79495115d900 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -701,7 +701,6 @@ private static void assertEquals(String context, DataType type, Object expected, .as("Actual should be an InternalRow: " + context) .isInstanceOf(InternalRow.class); assertEquals(context, (StructType) type, (InternalRow) expected, (InternalRow) actual); - } else if (type instanceof ArrayType) { Assertions.assertThat(expected) .as("Expected should be an ArrayData: " + context) @@ -710,7 +709,6 @@ private static void assertEquals(String context, DataType type, Object expected, .as("Actual should be an ArrayData: " + context) .isInstanceOf(ArrayData.class); assertEquals(context, (ArrayType) type, (ArrayData) expected, (ArrayData) actual); - } else if (type instanceof MapType) { Assertions.assertThat(expected) .as("Expected should be a MapData: " + context) @@ -719,7 +717,6 @@ private static void assertEquals(String context, DataType type, Object expected, .as("Actual should be a MapData: " + context) .isInstanceOf(MapData.class); assertEquals(context, (MapType) type, (MapData) expected, (MapData) actual); - } else if (type instanceof BinaryType) { assertEqualBytes(context, (byte[]) expected, (byte[]) actual); } else { @@ -808,4 +805,20 @@ public static Set deleteFiles(Table table) { return deleteFiles; } + + public static Object convertJavaPrimitiveToAvroPrimitive( + Map typeToSchema, Type.PrimitiveType primitive, Object value) { + // For the primitives that Avro needs a different type than Spark, fix + // them here. + switch (primitive.typeId()) { + case FIXED: + return new GenericData.Fixed(typeToSchema.get(primitive), (byte[]) value); + case BINARY: + return ByteBuffer.wrap((byte[]) value); + case UUID: + return UUID.nameUUIDFromBytes((byte[]) value); + default: + return value; + } + } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReadDefaultValue.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReadDefaultValue.java new file mode 100644 index 000000000000..e82a33b75676 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReadDefaultValue.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data; + +import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SingleValueParser; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.SingleValueToAvroValue; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.source.BaseReader; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestSparkAvroReadDefaultValue { + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private static final Object[][] typesWithDefaults = + new Object[][] { + {Types.BooleanType.get(), "true"}, + {Types.IntegerType.get(), "1"}, + {Types.LongType.get(), "9999999"}, + {Types.FloatType.get(), "1.23"}, + {Types.DoubleType.get(), "123.456"}, + {Types.DateType.get(), "\"2007-12-03\""}, + // Spark doesn't support Time type + // {Types.TimeType.get(), "\"10:15:30\""}, + {Types.TimestampType.withoutZone(), "\"2007-12-03T10:15:30\""}, + {Types.TimestampType.withZone(), "\"2007-12-03T10:15:30+00:00\""}, + {Types.StringType.get(), "\"foo\""}, + {Types.UUIDType.get(), "\"eb26bdb1-a1d8-4aa6-990e-da940875492c\""}, + {Types.FixedType.ofLength(2), "\"111f\""}, + {Types.BinaryType.get(), "\"0000ff\""}, + {Types.DecimalType.of(9, 4), "\"123.4500\""}, + {Types.DecimalType.of(9, 0), "\"2\""}, + // Avro doesn't support negative scale + // {Types.DecimalType.of(9, -20), "\"2E+20\""}, + {Types.ListType.ofOptional(1, Types.IntegerType.get()), "[1, 2, 3]"}, + { + Types.MapType.ofOptional(2, 3, Types.IntegerType.get(), Types.StringType.get()), + "{\"keys\": [1, 2], \"values\": [\"foo\", \"bar\"]}" + }, + { + Types.StructType.of( + required(4, "f1", Types.IntegerType.get()), + optional(5, "f2", Types.StringType.get())), + "{\"4\": 1, \"5\": \"bar\"}" + }, + // deeply nested complex types + { + Types.ListType.ofOptional( + 6, + Types.StructType.of( + required(7, "f1", Types.IntegerType.get()), + optional(8, "f2", Types.StringType.get()))), + "[{\"7\": 1, \"8\": \"bar\"}, {\"7\": 2, \"8\": " + "\"foo\"}]" + }, + { + Types.MapType.ofOptional( + 9, + 10, + Types.IntegerType.get(), + Types.StructType.of( + required(11, "f1", Types.IntegerType.get()), + optional(12, "f2", Types.StringType.get()))), + "{\"keys\": [1, 2], \"values\": [{\"11\": 1, \"12\": \"bar\"}, {\"11\": 2, \"12\": \"foo\"}]}" + }, + { + Types.StructType.of( + required( + 13, + "f1", + Types.StructType.of( + optional(14, "ff1", Types.IntegerType.get()), + optional(15, "ff2", Types.StringType.get()))), + optional( + 16, + "f2", + Types.StructType.of( + optional(17, "ff1", Types.StringType.get()), + optional(18, "ff2", Types.IntegerType.get())))), + "{\"13\": {\"14\": 1, \"15\": \"bar\"}, \"16\": {\"17\": \"bar\", \"18\": 1}}" + }, + }; + + @Test + public void writeAndValidate() throws IOException { + for (Object[] typeWithDefault : typesWithDefaults) { + Type type = (Type) typeWithDefault[0]; + String defaultValueJson = (String) typeWithDefault[1]; + Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); + + // Convert defaultValue to spark-compliant constant + Object sparkConst = BaseReader.convertConstant(type, defaultValue); + Map defaultMap = ImmutableMap.of(1000, sparkConst); + + Schema writerSchema = new Schema(required(999, "col1", Types.IntegerType.get())); + + Schema readerSchema = + new Schema(required(999, "col1", Types.IntegerType.get()), optional(1000, "col2", type)); + Map typeToSchema = + AvroSchemaUtil.convertTypes(readerSchema.asStruct(), "test"); + + List expected = RandomData.generateList(readerSchema, 100, 0L); + for (GenericData.Record record : expected) { + record.put(1, SingleValueToAvroValue.convert(type, defaultValue)); + } + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)).schema(writerSchema).named("test").build()) { + for (GenericData.Record rec : expected) { + writer.add(rec); + } + } + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .createReaderFunc( + readAvroSchema -> new SparkAvroReader(readerSchema, readAvroSchema, defaultMap)) + .project(readerSchema) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + assertEqualsUnsafe(readerSchema.asStruct(), expected.get(i), rows.get(i)); + } + } + } +} From 77376a5812aafb4491c4959589b533eae8fa75d8 Mon Sep 17 00:00:00 2001 From: Limian Zhang <75445426+rzhang10@users.noreply.github.com> Date: Thu, 6 Oct 2022 16:30:21 -0700 Subject: [PATCH 2/3] Add todo pseudo code and comment --- .../main/java/org/apache/iceberg/data/GenericReader.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index 3637bf00bd58..9af4074402f8 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -95,6 +95,13 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject InputFile input = io.newInputFile(task.file().path().toString()); Map partition = PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant); + //TODO: Construct default value map from table schema, and merge it with the partition map to consolidate one id + // to constant map + // + // pseudo code demo below + // Map defaultValueMap = + // partition = partition.putAll(defaultValueMap); # hopefully we rename the `partition` to something else like + // idToConstant switch (task.file().format()) { case AVRO: From 6960f5c2a39ae08d11223ab89b526cfc6a77cbef Mon Sep 17 00:00:00 2001 From: Limian Zhang <75445426+rzhang10@users.noreply.github.com> Date: Thu, 6 Oct 2022 16:58:03 -0700 Subject: [PATCH 3/3] Add todo in spark constant map construction --- .../main/java/org/apache/iceberg/spark/source/BaseReader.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 1eb662e8beea..193360f8a090 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -196,11 +196,15 @@ private EncryptedInputFile toEncryptedInputFile(ContentFile file) { } protected Map constantsMap(ContentScanTask task, Schema readSchema) { + // TODO: Add default value map construction and merge it with partition column constant map + // Map defaultMap= if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { StructType partitionType = Partitioning.partitionType(table); return PartitionUtil.constantsMap(task, partitionType, BaseReader::convertConstant); + // return PartitionUtil.constantsMap(task, partitionType, BaseReader::convertConstant).putAll(defaultMap); } else { return PartitionUtil.constantsMap(task, BaseReader::convertConstant); + // return PartitionUtil.constantsMap(task, BaseReader::convertConstant).putAll(defaultMap); } }