From 5b49ea719fd2877c2318fe96f0c70f9388b834b5 Mon Sep 17 00:00:00 2001 From: Limian Zhang <75445426+rzhang10@users.noreply.github.com> Date: Mon, 17 Oct 2022 14:32:06 -0700 Subject: [PATCH 01/11] Add default value API --- .../apache/iceberg/types/PruneColumns.java | 6 ++- .../java/org/apache/iceberg/types/Types.java | 51 +++++++++++++++---- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java index daf2e6bbc0ca..75d92deb9118 100644 --- a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java +++ b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java @@ -68,11 +68,13 @@ public Type struct(Types.StructType struct, List fieldResults) { if (field.isOptional()) { selectedFields.add( Types.NestedField.optional( - field.fieldId(), field.name(), projectedType, field.doc())); + field.fieldId(), field.name(), projectedType, field.doc(), + field.initialDefault(), field.writeDefault())); } else { selectedFields.add( Types.NestedField.required( - field.fieldId(), field.name(), projectedType, field.doc())); + field.fieldId(), field.name(), projectedType, field.doc(), + field.initialDefault(), field.writeDefault())); } } } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index da70dd9ac6ab..5bf08ca57e3d 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -413,27 +413,42 @@ public int hashCode() { public static class NestedField implements Serializable { public static NestedField optional(int id, String name, Type type) { - return new NestedField(true, id, name, type, null); + return new NestedField(true, id, name, type, null, null, null); } public static NestedField optional(int id, String name, Type type, String doc) { - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, null, null); + } + + public static NestedField optional(int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public static NestedField required(int id, String name, Type type) { - return new NestedField(false, id, name, type, null); + return new NestedField(false, id, name, type, null, null, null); } public static NestedField required(int id, String name, Type type, String doc) { - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, null, null); + } + + public static NestedField required(int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); } public static NestedField of(int id, boolean isOptional, String name, Type type) { - return new NestedField(isOptional, id, name, type, null); + return new NestedField(isOptional, id, name, type, null, null, null); } public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) { - return new NestedField(isOptional, id, name, type, doc); + return new NestedField(isOptional, id, name, type, doc, null, null); + } + + public static NestedField of(int id, boolean isOptional, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault); } private final boolean isOptional; @@ -441,8 +456,12 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, private final String name; private final Type type; private final String doc; + private final Object initialDefault; + private final Object writeDefault; - private NestedField(boolean isOptional, int id, String name, Type type, String doc) { + private NestedField( + boolean isOptional, int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); this.isOptional = isOptional; @@ -450,6 +469,8 @@ private NestedField(boolean isOptional, int id, String name, Type type, String d this.name = name; this.type = type; this.doc = doc; + this.initialDefault = initialDefault; + this.writeDefault = writeDefault; } public boolean isOptional() { @@ -460,7 +481,7 @@ public NestedField asOptional() { if (isOptional) { return this; } - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public boolean isRequired() { @@ -471,7 +492,11 @@ public NestedField asRequired() { if (!isOptional) { return this; } - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); + } + + public NestedField withWriteDefault(Object newWriteDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, newWriteDefault); } public int fieldId() { @@ -490,6 +515,14 @@ public String doc() { return doc; } + public Object initialDefault() { + return initialDefault; + } + + public Object writeDefault() { + return writeDefault; + } + @Override public String toString() { return String.format("%d: %s: %s %s", id, name, isOptional ? "optional" : "required", type) From 6a3540c3792faf9cda0c9a6c162b04da4500b243 Mon Sep 17 00:00:00 2001 From: Limian Zhang <75445426+rzhang10@users.noreply.github.com> Date: Mon, 17 Oct 2022 16:34:59 -0700 Subject: [PATCH 02/11] Data: Support reading default values from generic Avro readers --- .../iceberg/avro/BuildAvroProjection.java | 12 +- .../org/apache/iceberg/avro/ValueReaders.java | 8 +- .../apache/iceberg/data/avro/DataReader.java | 9 +- .../iceberg/data/avro/GenericReaders.java | 15 +- .../apache/iceberg/data/DataTestHelpers.java | 12 ++ .../data/avro/TestReadDefaultValues.java | 167 ++++++++++++++++++ .../iceberg/flink/data/FlinkAvroReader.java | 2 +- .../iceberg/flink/data/FlinkValueReaders.java | 9 +- .../iceberg/spark/data/SparkAvroReader.java | 2 +- .../iceberg/spark/data/SparkValueReaders.java | 9 +- 10 files changed, 225 insertions(+), 20 deletions(-) create mode 100644 data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index c5c78dd1472a..1f86c0afa2bc 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -103,9 +103,17 @@ public Schema record(Schema record, List names, Iterable s } else { Preconditions.checkArgument( - field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), - "Missing required field: %s", + (field.isRequired() && field.initialDefault() != null) + || field.isOptional() + || MetadataColumns.metadataFieldIds().contains(field.fieldId()), + "Missing required field that doesn't have a default value: %s", field.name()); + // If the field from Iceberg schema has initial default value, we just pass and don't + // project it to the avro file read schema with the generated _r field name, + // the default value will be directly read from the Iceberg layer + if (field.initialDefault() != null) { + continue; + } // Create a field that will be defaulted to null. We assign a unique suffix to the field // to make sure that even if records in the file have the field it is not projected. Schema.Field newField = diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 19789cce82fc..59f499ed9ed8 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -595,7 +595,10 @@ protected StructReader(List> readers, Schema schema) { } protected StructReader( - List> readers, Types.StructType struct, Map idToConstant) { + List> readers, + Types.StructType struct, + Schema record, + Map idToConstant) { this.readers = readers.toArray(new ValueReader[0]); List fields = struct.fields(); @@ -612,6 +615,9 @@ protected StructReader( } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { positionList.add(pos); constantList.add(false); + } else if (record.getField(field.name()) == null && field.initialDefault() != null) { + positionList.add(pos); + constantList.add(field.initialDefault()); } } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 1cc901d15bc1..2f12fae613b6 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -80,8 +80,11 @@ public void setRowPositionSupplier(Supplier posSupplier) { } protected ValueReader createStructReader( - Types.StructType struct, List> fields, Map idToConstant) { - return GenericReaders.struct(struct, fields, idToConstant); + Types.StructType struct, + Schema record, + List> fields, + Map idToConstant) { + return GenericReaders.struct(struct, record, fields, idToConstant); } private class ReadBuilder extends AvroSchemaWithTypeVisitor> { @@ -94,7 +97,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType struct, Schema record, List names, List> fields) { - return createStructReader(struct, fields, idToConstant); + return createStructReader(struct, record, fields, idToConstant); } @Override diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 91a728d53d38..4c41ecaa59f2 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -25,6 +25,7 @@ import java.time.OffsetDateTime; import java.util.List; import java.util.Map; +import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; @@ -53,8 +54,11 @@ static ValueReader timestamptz() { } static ValueReader struct( - StructType struct, List> readers, Map idToConstant) { - return new GenericRecordReader(readers, struct, idToConstant); + StructType struct, + Schema record, + List> readers, + Map idToConstant) { + return new GenericRecordReader(readers, struct, record, idToConstant); } private static class DateReader implements ValueReader { @@ -105,8 +109,11 @@ private static class GenericRecordReader extends ValueReaders.StructReader> readers, StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + List> readers, + StructType struct, + Schema record, + Map idToConstant) { + super(readers, struct, record, idToConstant); this.structType = struct; } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index d4813ca7bc77..ae5e05923649 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -18,10 +18,12 @@ */ package org.apache.iceberg.data; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -88,6 +90,16 @@ private static void assertEquals(Type type, Object expected, Object actual) { "Primitive value should be equal to expected for type " + type, expected, actual); break; case FIXED: + // For fixed type, Iceberg actually represent value as Bytebuffer, + // but since RandomGenericData generates bytearray data for fixed type + // for it to be written to Avro, we add a conversion here to make the + // equality comparison consistent using bytearray + if (expected instanceof ByteBuffer) { + expected = ByteBuffers.toByteArray((ByteBuffer) expected); + } + if (actual instanceof ByteBuffer) { + actual = ByteBuffers.toByteArray((ByteBuffer) actual); + } Assertions.assertThat(expected) .as("Expected should be a byte[]") .isInstanceOf(byte[].class); diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java new file mode 100644 index 000000000000..3cc8b9ececbf --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SingleValueParser; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.data.DataTestHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestReadDefaultValues { + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private static final Object[][] typesWithDefaults = + new Object[][] { + {Types.BooleanType.get(), "true"}, + {Types.IntegerType.get(), "1"}, + {Types.LongType.get(), "9999999"}, + {Types.FloatType.get(), "1.23"}, + {Types.DoubleType.get(), "123.456"}, + {Types.DateType.get(), "\"2007-12-03\""}, + {Types.TimeType.get(), "\"10:15:30\""}, + {Types.TimestampType.withoutZone(), "\"2007-12-03T10:15:30\""}, + {Types.TimestampType.withZone(), "\"2007-12-03T10:15:30+00:00\""}, + {Types.StringType.get(), "\"foo\""}, + {Types.UUIDType.get(), "\"eb26bdb1-a1d8-4aa6-990e-da940875492c\""}, + {Types.FixedType.ofLength(2), "\"111f\""}, + {Types.BinaryType.get(), "\"0000ff\""}, + {Types.DecimalType.of(9, 4), "\"123.4500\""}, + {Types.DecimalType.of(9, 0), "\"2\""}, + // Avro doesn't support negative scale + // {Types.DecimalType.of(9, -20), "\"2E+20\""}, + {Types.ListType.ofOptional(1, Types.IntegerType.get()), "[1, 2, 3]"}, + { + Types.MapType.ofOptional(2, 3, Types.IntegerType.get(), Types.StringType.get()), + "{\"keys\": [1, 2], \"values\": [\"foo\", \"bar\"]}" + }, + { + Types.StructType.of( + required(4, "f1", Types.IntegerType.get()), + optional(5, "f2", Types.StringType.get())), + "{\"4\": 1, \"5\": \"bar\"}" + }, + // deeply nested complex types + { + Types.ListType.ofOptional( + 6, + Types.StructType.of( + required(7, "f1", Types.IntegerType.get()), + optional(8, "f2", Types.StringType.get()))), + "[{\"7\": 1, \"8\": \"bar\"}, {\"7\": 2, \"8\": " + "\"foo\"}]" + }, + { + Types.MapType.ofOptional( + 9, + 10, + Types.IntegerType.get(), + Types.StructType.of( + required(11, "f1", Types.IntegerType.get()), + optional(12, "f2", Types.StringType.get()))), + "{\"keys\": [1, 2], \"values\": [{\"11\": 1, \"12\": \"bar\"}, {\"11\": 2, \"12\": \"foo\"}]}" + }, + { + Types.StructType.of( + required( + 13, + "f1", + Types.StructType.of( + optional(14, "ff1", Types.IntegerType.get()), + optional(15, "ff2", Types.StringType.get()))), + optional( + 16, + "f2", + Types.StructType.of( + optional(17, "ff1", Types.StringType.get()), + optional(18, "ff2", Types.IntegerType.get())))), + "{\"13\": {\"14\": 1, \"15\": \"bar\"}, \"16\": {\"17\": \"bar\", \"18\": 1}}" + }, + }; + + @Test + public void writeAndValidate() throws IOException { + for (Object[] typeWithDefault : typesWithDefaults) { + Type type = (Type) typeWithDefault[0]; + String defaultValueJson = (String) typeWithDefault[1]; + Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); + + Schema writerSchema = new Schema(required(999, "col1", Types.IntegerType.get())); + + Schema readerSchema = + new Schema( + required(999, "col1", Types.IntegerType.get()), + optional(1000, "col2", type, null, defaultValue, defaultValue)); + + List generatedRecords = RandomGenericData.generate(writerSchema, 100, 0L); + List expected = Lists.newArrayList(); + for (Record record : generatedRecords) { + Record r = GenericRecord.create(readerSchema); + r.set(0, record.get(0)); + r.set(1, defaultValue); + expected.add(r); + } + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)) + .schema(writerSchema) + .createWriterFunc(DataWriter::create) + .named("test") + .build()) { + for (Record rec : generatedRecords) { + writer.add(rec); + } + } + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .project(readerSchema) + .createReaderFunc(DataReader::create) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + DataTestHelpers.assertEquals(readerSchema.asStruct(), expected.get(i), rows.get(i)); + } + } + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java index 86404959735a..00679a165f1f 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java @@ -83,7 +83,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType expected, Schema record, List names, List> fields) { - return FlinkValueReaders.struct(fields, expected.asStructType(), idToConstant); + return FlinkValueReaders.struct(fields, expected.asStructType(), record, idToConstant); } @Override diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 32f6c3a2ccfd..f55620928d63 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.flink.table.data.ArrayData; @@ -87,8 +88,8 @@ static ValueReader map(ValueReader keyReader, ValueReader valueRe } static ValueReader struct( - List> readers, Types.StructType struct, Map idToConstant) { - return new StructReader(readers, struct, idToConstant); + List> readers, Types.StructType struct, Schema record, Map idToConstant) { + return new StructReader(readers, struct, record, idToConstant); } private static class StringReader implements ValueReader { @@ -286,8 +287,8 @@ private static class StructReader extends ValueReaders.StructReader { private final int numFields; private StructReader( - List> readers, Types.StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + List> readers, Types.StructType struct, Schema record, Map idToConstant) { + super(readers, struct, record, idToConstant); this.numFields = readers.size(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index 4622d2928ac4..8caaffd00bdb 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -83,7 +83,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType expected, Schema record, List names, List> fields) { - return SparkValueReaders.struct(fields, expected, idToConstant); + return SparkValueReaders.struct(fields, expected, record, idToConstant); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 3cbf38d88bf4..17f183c490e9 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; @@ -75,8 +76,8 @@ static ValueReader map(ValueReader keyReader, ValueReader< } static ValueReader struct( - List> readers, Types.StructType struct, Map idToConstant) { - return new StructReader(readers, struct, idToConstant); + List> readers, Types.StructType struct, Schema record, Map idToConstant) { + return new StructReader(readers, struct, record, idToConstant); } private static class StringReader implements ValueReader { @@ -253,8 +254,8 @@ static class StructReader extends ValueReaders.StructReader { private final int numFields; protected StructReader( - List> readers, Types.StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + List> readers, Types.StructType struct, Schema record, Map idToConstant) { + super(readers, struct, record, idToConstant); this.numFields = readers.size(); } From cb5e31a3dba6ce68562ef7f12a189d075df0812c Mon Sep 17 00:00:00 2001 From: Limian Zhang <75445426+rzhang10@users.noreply.github.com> Date: Mon, 17 Oct 2022 16:54:42 -0700 Subject: [PATCH 03/11] SpotlessApply --- .../apache/iceberg/types/PruneColumns.java | 16 ++++++++--- .../java/org/apache/iceberg/types/Types.java | 27 +++++++++++++------ 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java index 75d92deb9118..52b01c71e1b6 100644 --- a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java +++ b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java @@ -68,13 +68,21 @@ public Type struct(Types.StructType struct, List fieldResults) { if (field.isOptional()) { selectedFields.add( Types.NestedField.optional( - field.fieldId(), field.name(), projectedType, field.doc(), - field.initialDefault(), field.writeDefault())); + field.fieldId(), + field.name(), + projectedType, + field.doc(), + field.initialDefault(), + field.writeDefault())); } else { selectedFields.add( Types.NestedField.required( - field.fieldId(), field.name(), projectedType, field.doc(), - field.initialDefault(), field.writeDefault())); + field.fieldId(), + field.name(), + projectedType, + field.doc(), + field.initialDefault(), + field.writeDefault())); } } } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 5bf08ca57e3d..5a813021ab0d 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -420,8 +420,8 @@ public static NestedField optional(int id, String name, Type type, String doc) { return new NestedField(true, id, name, type, doc, null, null); } - public static NestedField optional(int id, String name, Type type, String doc, - Object initialDefault, Object writeDefault) { + public static NestedField optional( + int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } @@ -433,8 +433,8 @@ public static NestedField required(int id, String name, Type type, String doc) { return new NestedField(false, id, name, type, doc, null, null); } - public static NestedField required(int id, String name, Type type, String doc, - Object initialDefault, Object writeDefault) { + public static NestedField required( + int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); } @@ -446,8 +446,14 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, return new NestedField(isOptional, id, name, type, doc, null, null); } - public static NestedField of(int id, boolean isOptional, String name, Type type, String doc, - Object initialDefault, Object writeDefault) { + public static NestedField of( + int id, + boolean isOptional, + String name, + Type type, + String doc, + Object initialDefault, + Object writeDefault) { return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault); } @@ -460,8 +466,13 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, private final Object writeDefault; private NestedField( - boolean isOptional, int id, String name, Type type, String doc, - Object initialDefault, Object writeDefault) { + boolean isOptional, + int id, + String name, + Type type, + String doc, + Object initialDefault, + Object writeDefault) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); this.isOptional = isOptional; From 251e1381fdae703aff17b05b6f1e3300a797548d Mon Sep 17 00:00:00 2001 From: Limian Zhang <75445426+rzhang10@users.noreply.github.com> Date: Thu, 20 Oct 2022 11:28:03 -0700 Subject: [PATCH 04/11] FIx testMissingRequiredFields --- .../test/java/org/apache/iceberg/avro/TestAvroNameMapping.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index c6bdb88b7d97..fe348117d9b9 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -205,7 +205,7 @@ public void testMissingRequiredFields() { AssertHelpers.assertThrows( "Missing required field in nameMapping", IllegalArgumentException.class, - "Missing required field: x", + "Missing required field that doesn't have a default value: x", // In this case, pruneColumns result is an empty record () -> writeAndRead(writeSchema, readSchema, record, nameMapping)); } From b20b7651bead943a98cb824b1e984ea75e93162f Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Sun, 2 Apr 2023 11:12:28 -0700 Subject: [PATCH 05/11] Simplify some APIs and address review comments --- .palantir/revapi.yml | 7 +++ .../java/org/apache/iceberg/types/Types.java | 4 +- .../org/apache/iceberg/SingleValueParser.java | 27 +++++--- .../iceberg/avro/BuildAvroProjection.java | 8 +-- .../org/apache/iceberg/avro/ValueReaders.java | 63 ++++++++++++------- .../iceberg/data/avro/GenericReaders.java | 2 +- .../iceberg/avro/TestAvroNameMapping.java | 2 +- .../apache/iceberg/data/DataTestHelpers.java | 12 ---- .../data/avro/TestReadDefaultValues.java | 41 ++++++++++-- .../iceberg/flink/data/FlinkAvroReader.java | 2 +- .../iceberg/flink/data/FlinkValueReaders.java | 9 ++- .../iceberg/spark/data/SparkAvroReader.java | 2 +- .../iceberg/spark/data/SparkValueReaders.java | 9 ++- 13 files changed, 119 insertions(+), 69 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 053529a21c47..fde647c85be3 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -735,6 +735,10 @@ acceptedBreaks: old: "class org.apache.iceberg.PartitionKey" new: "class org.apache.iceberg.PartitionKey" justification: "Serialization across versions is not supported" + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.types.Types.NestedField" + new: "class org.apache.iceberg.types.Types.NestedField" + justification: "Serialization across versions is not supported" - code: "java.class.removed" old: "interface org.apache.iceberg.Rollback" justification: "Deprecations for 1.0 release" @@ -744,6 +748,9 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method java.util.List org.apache.iceberg.Table::statisticsFiles()" justification: "new API method" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)" + justification: "Adding table scan APIs to support scanning from refs" - code: "java.method.removed" old: "method java.lang.Iterable org.apache.iceberg.Snapshot::addedFiles()" justification: "Deprecations for 1.0 release" diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 5a813021ab0d..68fca7c4eaf5 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -420,7 +420,7 @@ public static NestedField optional(int id, String name, Type type, String doc) { return new NestedField(true, id, name, type, doc, null, null); } - public static NestedField optional( + protected static NestedField optional( int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } @@ -465,7 +465,7 @@ public static NestedField of( private final Object initialDefault; private final Object writeDefault; - private NestedField( + protected NestedField( boolean isOptional, int id, String name, diff --git a/core/src/main/java/org/apache/iceberg/SingleValueParser.java b/core/src/main/java/org/apache/iceberg/SingleValueParser.java index 3de6a0bcc663..35f92f32b314 100644 --- a/core/src/main/java/org/apache/iceberg/SingleValueParser.java +++ b/core/src/main/java/org/apache/iceberg/SingleValueParser.java @@ -153,7 +153,7 @@ public static Object fromJson(Type type, JsonNode defaultValue) { defaultLength); byte[] fixedBytes = BaseEncoding.base16().decode(defaultValue.textValue().toUpperCase(Locale.ROOT)); - return ByteBuffer.wrap(fixedBytes); + return fixedBytes; case BINARY: Preconditions.checkArgument( defaultValue.isTextual(), "Cannot parse default as a %s value: %s", type, defaultValue); @@ -236,7 +236,7 @@ public static String toJson(Type type, Object defaultValue, boolean pretty) { return JsonUtil.generate(gen -> toJson(type, defaultValue, gen), pretty); } - @SuppressWarnings("checkstyle:MethodLength") + @SuppressWarnings({"checkstyle:MethodLength", "checkstyle:CyclomaticComplexity"}) public static void toJson(Type type, Object defaultValue, JsonGenerator generator) throws IOException { if (defaultValue == null) { @@ -303,17 +303,28 @@ public static void toJson(Type type, Object defaultValue, JsonGenerator generato generator.writeString(defaultValue.toString()); break; case FIXED: + // Normally, FIXED is backed by a byte[], but it can also be backed by a ByteBuffer in the + // case of Fixed Literals. Ideally, Fixed Literals would be backed by a byte[], but that + // would make the APIs backwards incompatible. + // See {@link org.apache.iceberg.expressions.Literals.FixedLiteral}. Preconditions.checkArgument( - defaultValue instanceof ByteBuffer, "Invalid default %s value: %s", type, defaultValue); - ByteBuffer byteBufferValue = (ByteBuffer) defaultValue; + defaultValue instanceof byte[] || defaultValue instanceof ByteBuffer, + "Invalid default %s value: %s", + type, + defaultValue); + byte[] byteArrayValue; + if (defaultValue instanceof ByteBuffer) { + byteArrayValue = ByteBuffers.toByteArray((ByteBuffer) defaultValue); + } else { + byteArrayValue = (byte[]) defaultValue; + } int expectedLength = ((Types.FixedType) type).length(); Preconditions.checkArgument( - byteBufferValue.remaining() == expectedLength, + byteArrayValue.length == expectedLength, "Invalid default %s value, incorrect length: %s", type, - byteBufferValue.remaining()); - generator.writeString( - BaseEncoding.base16().encode(ByteBuffers.toByteArray(byteBufferValue))); + byteArrayValue.length); + generator.writeString(BaseEncoding.base16().encode(byteArrayValue)); break; case BINARY: Preconditions.checkArgument( diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 1f86c0afa2bc..c731ee1d1b11 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -106,14 +106,14 @@ public Schema record(Schema record, List names, Iterable s (field.isRequired() && field.initialDefault() != null) || field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), - "Missing required field that doesn't have a default value: %s", + "Missing required field: %s", field.name()); - // If the field from Iceberg schema has initial default value, we just pass and don't - // project it to the avro file read schema with the generated _r field name, - // the default value will be directly read from the Iceberg layer + // If the field has an initial default value, do not project it in the read schema. + // The default will be added in {@link org.apache.iceberg.avro.ValueReader}. if (field.initialDefault() != null) { continue; } + // Create a field that will be defaulted to null. We assign a unique suffix to the field // to make sure that even if records in the file have the field it is not projected. Schema.Field newField = diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 59f499ed9ed8..f7254d0aaec1 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -566,8 +566,10 @@ public Map read(Decoder decoder, Object reuse) throws IOException { public abstract static class StructReader implements ValueReader, SupportsRowPosition { private final ValueReader[] readers; - private final int[] positions; - private final Object[] constants; + private final int[] constantValuesPositions; + private final Object[] constantValues; + private final int[] defaultValuesPositions; + private final Object[] defaultValues; private int posField = -1; protected StructReader(List> readers, Schema schema) { @@ -586,43 +588,51 @@ protected StructReader(List> readers, Schema schema) { } if (isDeletedColumnPos == null) { - this.positions = new int[0]; - this.constants = new Object[0]; + this.constantValuesPositions = new int[0]; + this.constantValues = new Object[0]; } else { - this.positions = new int[] {isDeletedColumnPos}; - this.constants = new Object[] {false}; + this.constantValuesPositions = new int[] {isDeletedColumnPos}; + this.constantValues = new Object[] {false}; } + this.defaultValuesPositions = new int[0]; + this.defaultValues = new Object[0]; } protected StructReader( - List> readers, - Types.StructType struct, - Schema record, - Map idToConstant) { + List> readers, Types.StructType struct, Map idToConstant) { this.readers = readers.toArray(new ValueReader[0]); List fields = struct.fields(); - List positionList = Lists.newArrayListWithCapacity(fields.size()); - List constantList = Lists.newArrayListWithCapacity(fields.size()); + List constantValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); + List constantValuesList = Lists.newArrayListWithCapacity(fields.size()); + List defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); + List defaultValuesList = Lists.newArrayListWithCapacity(fields.size()); for (int pos = 0; pos < fields.size(); pos += 1) { Types.NestedField field = fields.get(pos); if (idToConstant.containsKey(field.fieldId())) { - positionList.add(pos); - constantList.add(idToConstant.get(field.fieldId())); + constantValuesPositionList.add(pos); + constantValuesList.add(idToConstant.get(field.fieldId())); } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) { // track where the _pos field is located for setRowPositionSupplier this.posField = pos; } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { - positionList.add(pos); - constantList.add(false); - } else if (record.getField(field.name()) == null && field.initialDefault() != null) { - positionList.add(pos); - constantList.add(field.initialDefault()); + constantValuesPositionList.add(pos); + constantValuesList.add(false); + } else if (field.initialDefault() != null) { + // Add a constant value for fields that have a default value. + // In the {@link #read()} method, this will be leveraged only if there is no corresponding + // reader. + defaultValuesPositionList.add(pos); + defaultValuesList.add(field.initialDefault()); } } - this.positions = positionList.stream().mapToInt(Integer::intValue).toArray(); - this.constants = constantList.toArray(); + this.constantValuesPositions = + constantValuesPositionList.stream().mapToInt(Integer::intValue).toArray(); + this.constantValues = constantValuesList.toArray(); + this.defaultValuesPositions = + defaultValuesPositionList.stream().mapToInt(Integer::intValue).toArray(); + this.defaultValues = defaultValuesList.toArray(); } @Override @@ -659,13 +669,18 @@ public ValueReader reader(int pos) { public S read(Decoder decoder, Object reuse) throws IOException { S struct = reuseOrCreate(reuse); + // Set the default values first. Setting default values first allows them to be overridden + // once the data is read from the file if the corresponding field is present in the file. + for (int i = 0; i < defaultValuesPositions.length; i += 1) { + set(struct, defaultValuesPositions[i], defaultValues[i]); + } + if (decoder instanceof ResolvingDecoder) { // this may not set all of the fields. nulls are set by default. for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { Object reusedValue = get(struct, field.pos()); set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); } - } else { for (int i = 0; i < readers.length; i += 1) { Object reusedValue = get(struct, i); @@ -673,8 +688,8 @@ public S read(Decoder decoder, Object reuse) throws IOException { } } - for (int i = 0; i < positions.length; i += 1) { - set(struct, positions[i], constants[i]); + for (int i = 0; i < constantValuesPositions.length; i += 1) { + set(struct, constantValuesPositions[i], constantValues[i]); } return struct; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 4c41ecaa59f2..631c50486564 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -113,7 +113,7 @@ private GenericRecordReader( StructType struct, Schema record, Map idToConstant) { - super(readers, struct, record, idToConstant); + super(readers, struct, idToConstant); this.structType = struct; } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index fe348117d9b9..c6bdb88b7d97 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -205,7 +205,7 @@ public void testMissingRequiredFields() { AssertHelpers.assertThrows( "Missing required field in nameMapping", IllegalArgumentException.class, - "Missing required field that doesn't have a default value: x", + "Missing required field: x", // In this case, pruneColumns result is an empty record () -> writeAndRead(writeSchema, readSchema, record, nameMapping)); } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index ae5e05923649..d4813ca7bc77 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -18,12 +18,10 @@ */ package org.apache.iceberg.data; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ByteBuffers; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -90,16 +88,6 @@ private static void assertEquals(Type type, Object expected, Object actual) { "Primitive value should be equal to expected for type " + type, expected, actual); break; case FIXED: - // For fixed type, Iceberg actually represent value as Bytebuffer, - // but since RandomGenericData generates bytearray data for fixed type - // for it to be written to Avro, we add a conversion here to make the - // equality comparison consistent using bytearray - if (expected instanceof ByteBuffer) { - expected = ByteBuffers.toByteArray((ByteBuffer) expected); - } - if (actual instanceof ByteBuffer) { - actual = ByteBuffers.toByteArray((ByteBuffer) actual); - } Assertions.assertThat(expected) .as("Expected should be a byte[]") .isInstanceOf(byte[].class); diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java index 3cc8b9ececbf..0f98d2e5712f 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java @@ -125,15 +125,16 @@ public void writeAndValidate() throws IOException { Schema readerSchema = new Schema( required(999, "col1", Types.IntegerType.get()), - optional(1000, "col2", type, null, defaultValue, defaultValue)); + NestedFieldWithInitialDefault.optionalWithDefault( + 1000, "col2", type, null, defaultValue, defaultValue)); List generatedRecords = RandomGenericData.generate(writerSchema, 100, 0L); List expected = Lists.newArrayList(); for (Record record : generatedRecords) { - Record r = GenericRecord.create(readerSchema); - r.set(0, record.get(0)); - r.set(1, defaultValue); - expected.add(r); + Record expectedRecord = GenericRecord.create(readerSchema); + expectedRecord.set(0, record.get(0)); + expectedRecord.set(1, defaultValue); + expected.add(expectedRecord); } File testFile = temp.newFile(); @@ -164,4 +165,34 @@ public void writeAndValidate() throws IOException { } } } + + // TODO: This class should be removed once NestedField.optional() that takes initialDefault and + // writeDefault becomes public. It is intentionally package private to avoid exposing it in the + // public API as of now. + static class NestedFieldWithInitialDefault extends Types.NestedField { + private final Object initialDefault; + + NestedFieldWithInitialDefault( + boolean isOptional, + int fieldId, + String name, + Type type, + String doc, + Object initialDefault, + Object writeDefault) { + super(isOptional, fieldId, name, type, doc, initialDefault, writeDefault); + this.initialDefault = initialDefault; + } + + static Types.NestedField optionalWithDefault( + int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { + return new NestedFieldWithInitialDefault( + true, id, name, type, doc, initialDefault, writeDefault); + } + + @Override + public Object initialDefault() { + return initialDefault; + } + } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java index 00679a165f1f..86404959735a 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java @@ -83,7 +83,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType expected, Schema record, List names, List> fields) { - return FlinkValueReaders.struct(fields, expected.asStructType(), record, idToConstant); + return FlinkValueReaders.struct(fields, expected.asStructType(), idToConstant); } @Override diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index f55620928d63..32f6c3a2ccfd 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.flink.table.data.ArrayData; @@ -88,8 +87,8 @@ static ValueReader map(ValueReader keyReader, ValueReader valueRe } static ValueReader struct( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - return new StructReader(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + return new StructReader(readers, struct, idToConstant); } private static class StringReader implements ValueReader { @@ -287,8 +286,8 @@ private static class StructReader extends ValueReaders.StructReader { private final int numFields; private StructReader( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - super(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); this.numFields = readers.size(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index 8caaffd00bdb..4622d2928ac4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -83,7 +83,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType expected, Schema record, List names, List> fields) { - return SparkValueReaders.struct(fields, expected, record, idToConstant); + return SparkValueReaders.struct(fields, expected, idToConstant); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 17f183c490e9..3cbf38d88bf4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; @@ -76,8 +75,8 @@ static ValueReader map(ValueReader keyReader, ValueReader< } static ValueReader struct( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - return new StructReader(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + return new StructReader(readers, struct, idToConstant); } private static class StringReader implements ValueReader { @@ -254,8 +253,8 @@ static class StructReader extends ValueReaders.StructReader { private final int numFields; protected StructReader( - List> readers, Types.StructType struct, Schema record, Map idToConstant) { - super(readers, struct, record, idToConstant); + List> readers, Types.StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); this.numFields = readers.size(); } From 70ae07b8766d49915cc505a1cd97205f232fb761 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Thu, 27 Apr 2023 23:27:46 -0700 Subject: [PATCH 06/11] Revert changes in SingleValueParser for testing --- .../org/apache/iceberg/SingleValueParser.java | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SingleValueParser.java b/core/src/main/java/org/apache/iceberg/SingleValueParser.java index 35f92f32b314..3de6a0bcc663 100644 --- a/core/src/main/java/org/apache/iceberg/SingleValueParser.java +++ b/core/src/main/java/org/apache/iceberg/SingleValueParser.java @@ -153,7 +153,7 @@ public static Object fromJson(Type type, JsonNode defaultValue) { defaultLength); byte[] fixedBytes = BaseEncoding.base16().decode(defaultValue.textValue().toUpperCase(Locale.ROOT)); - return fixedBytes; + return ByteBuffer.wrap(fixedBytes); case BINARY: Preconditions.checkArgument( defaultValue.isTextual(), "Cannot parse default as a %s value: %s", type, defaultValue); @@ -236,7 +236,7 @@ public static String toJson(Type type, Object defaultValue, boolean pretty) { return JsonUtil.generate(gen -> toJson(type, defaultValue, gen), pretty); } - @SuppressWarnings({"checkstyle:MethodLength", "checkstyle:CyclomaticComplexity"}) + @SuppressWarnings("checkstyle:MethodLength") public static void toJson(Type type, Object defaultValue, JsonGenerator generator) throws IOException { if (defaultValue == null) { @@ -303,28 +303,17 @@ public static void toJson(Type type, Object defaultValue, JsonGenerator generato generator.writeString(defaultValue.toString()); break; case FIXED: - // Normally, FIXED is backed by a byte[], but it can also be backed by a ByteBuffer in the - // case of Fixed Literals. Ideally, Fixed Literals would be backed by a byte[], but that - // would make the APIs backwards incompatible. - // See {@link org.apache.iceberg.expressions.Literals.FixedLiteral}. Preconditions.checkArgument( - defaultValue instanceof byte[] || defaultValue instanceof ByteBuffer, - "Invalid default %s value: %s", - type, - defaultValue); - byte[] byteArrayValue; - if (defaultValue instanceof ByteBuffer) { - byteArrayValue = ByteBuffers.toByteArray((ByteBuffer) defaultValue); - } else { - byteArrayValue = (byte[]) defaultValue; - } + defaultValue instanceof ByteBuffer, "Invalid default %s value: %s", type, defaultValue); + ByteBuffer byteBufferValue = (ByteBuffer) defaultValue; int expectedLength = ((Types.FixedType) type).length(); Preconditions.checkArgument( - byteArrayValue.length == expectedLength, + byteBufferValue.remaining() == expectedLength, "Invalid default %s value, incorrect length: %s", type, - byteArrayValue.length); - generator.writeString(BaseEncoding.base16().encode(byteArrayValue)); + byteBufferValue.remaining()); + generator.writeString( + BaseEncoding.base16().encode(ByteBuffers.toByteArray(byteBufferValue))); break; case BINARY: Preconditions.checkArgument( From a77c857b8be59262f36e9ebb3a4b44db0aba24aa Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Mon, 15 May 2023 16:24:12 -0700 Subject: [PATCH 07/11] Address review comments --- .../java/org/apache/iceberg/types/Types.java | 4 +- .../org/apache/iceberg/avro/ValueReaders.java | 35 ++++--- .../data/IdentityPartitionConverters.java | 7 +- .../data/avro/TestReadDefaultValues.java | 94 ++++++++++--------- 4 files changed, 74 insertions(+), 66 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 68fca7c4eaf5..9fcb9c73a222 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -420,7 +420,7 @@ public static NestedField optional(int id, String name, Type type, String doc) { return new NestedField(true, id, name, type, doc, null, null); } - protected static NestedField optional( + public static NestedField optional( int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } @@ -465,7 +465,7 @@ public static NestedField of( private final Object initialDefault; private final Object writeDefault; - protected NestedField( + public NestedField( boolean isOptional, int id, String name, diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index f7254d0aaec1..6d559c370c1d 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -25,14 +25,10 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.Collection; -import java.util.Deque; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.function.Supplier; +import java.util.stream.Collectors; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.IndexedRecord; @@ -41,6 +37,7 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -623,7 +620,7 @@ protected StructReader( // In the {@link #read()} method, this will be leveraged only if there is no corresponding // reader. defaultValuesPositionList.add(pos); - defaultValuesList.add(field.initialDefault()); + defaultValuesList.add(IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault())); } } @@ -669,17 +666,19 @@ public ValueReader reader(int pos) { public S read(Decoder decoder, Object reuse) throws IOException { S struct = reuseOrCreate(reuse); - // Set the default values first. Setting default values first allows them to be overridden - // once the data is read from the file if the corresponding field is present in the file. - for (int i = 0; i < defaultValuesPositions.length; i += 1) { - set(struct, defaultValuesPositions[i], defaultValues[i]); - } - if (decoder instanceof ResolvingDecoder) { - // this may not set all of the fields. nulls are set by default. - for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { - Object reusedValue = get(struct, field.pos()); - set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); + Set existingFieldPositionsSet = Arrays.stream(((ResolvingDecoder) decoder).readFieldOrder()).map(Schema.Field::pos).collect(Collectors.toSet()); + // This may not set all of the fields. nulls are set by default. + for (int i = 0; i < existingFieldPositionsSet.size(); i++) { + Object reusedValue = get(struct, i); + set(struct, i, readers[i].read(decoder, reusedValue)); + } + // Set default values + for (int i = 0; i < defaultValuesPositions.length; i += 1) { + // Set default values only if the field does not exist in the data. + if (!existingFieldPositionsSet.contains(defaultValuesPositions[i])) { + set(struct, defaultValuesPositions[i], defaultValues[i]); + } } } else { for (int i = 0; i < readers.length; i += 1) { diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java index 4cb41263152d..1391a7b0f9b8 100644 --- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java +++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java @@ -21,8 +21,11 @@ import org.apache.avro.generic.GenericData; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; +import java.nio.ByteBuffer; + public class IdentityPartitionConverters { private IdentityPartitionConverters() {} @@ -46,7 +49,9 @@ public static Object convertConstant(Type type, Object value) { return DateTimeUtil.timestampFromMicros((Long) value); } case FIXED: - if (value instanceof GenericData.Fixed) { + if (value instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } else if (value instanceof GenericData.Fixed) { return ((GenericData.Fixed) value).bytes(); } return value; diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java index 0f98d2e5712f..c06e8dae0bb6 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java @@ -29,10 +29,7 @@ import org.apache.iceberg.SingleValueParser; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroIterable; -import org.apache.iceberg.data.DataTestHelpers; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.RandomGenericData; -import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.*; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -114,7 +111,7 @@ public class TestReadDefaultValues { }; @Test - public void writeAndValidate() throws IOException { + public void testDefaultValueApplied() throws IOException { for (Object[] typeWithDefault : typesWithDefaults) { Type type = (Type) typeWithDefault[0]; String defaultValueJson = (String) typeWithDefault[1]; @@ -125,17 +122,12 @@ public void writeAndValidate() throws IOException { Schema readerSchema = new Schema( required(999, "col1", Types.IntegerType.get()), - NestedFieldWithInitialDefault.optionalWithDefault( + Types.NestedField.optional( 1000, "col2", type, null, defaultValue, defaultValue)); - List generatedRecords = RandomGenericData.generate(writerSchema, 100, 0L); - List expected = Lists.newArrayList(); - for (Record record : generatedRecords) { - Record expectedRecord = GenericRecord.create(readerSchema); - expectedRecord.set(0, record.get(0)); - expectedRecord.set(1, defaultValue); - expected.add(expectedRecord); - } + Record expectedRecord = GenericRecord.create(readerSchema); + expectedRecord.set(0, 1); + expectedRecord.set(1, IdentityPartitionConverters.convertConstant(type, defaultValue)); File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); @@ -146,9 +138,9 @@ public void writeAndValidate() throws IOException { .createWriterFunc(DataWriter::create) .named("test") .build()) { - for (Record rec : generatedRecords) { - writer.add(rec); - } + Record record = GenericRecord.create(writerSchema); + record.set(0, 1); + writer.add(record); } List rows; @@ -160,39 +152,51 @@ public void writeAndValidate() throws IOException { rows = Lists.newArrayList(reader); } - for (int i = 0; i < expected.size(); i += 1) { - DataTestHelpers.assertEquals(readerSchema.asStruct(), expected.get(i), rows.get(i)); - } + DataTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0)); } } - // TODO: This class should be removed once NestedField.optional() that takes initialDefault and - // writeDefault becomes public. It is intentionally package private to avoid exposing it in the - // public API as of now. - static class NestedFieldWithInitialDefault extends Types.NestedField { - private final Object initialDefault; - - NestedFieldWithInitialDefault( - boolean isOptional, - int fieldId, - String name, - Type type, - String doc, - Object initialDefault, - Object writeDefault) { - super(isOptional, fieldId, name, type, doc, initialDefault, writeDefault); - this.initialDefault = initialDefault; - } + @Test + public void testDefaultValueNotApplied() throws IOException { + for (Object[] typeWithDefault : typesWithDefaults) { + Type type = (Type) typeWithDefault[0]; + String defaultValueJson = (String) typeWithDefault[1]; + Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); - static Types.NestedField optionalWithDefault( - int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { - return new NestedFieldWithInitialDefault( - true, id, name, type, doc, initialDefault, writeDefault); - } + Schema readerSchema = + new Schema( + required(999, "col1", Types.IntegerType.get()), + Types.NestedField.optional( + 1000, "col2", type, null, defaultValue, defaultValue)); + + // Create a record with null value for the column with default value + Record record = GenericRecord.create(readerSchema); + record.set(0, 1); + record.set(1, null); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)) + .schema(readerSchema) + .createWriterFunc(DataWriter::create) + .named("test") + .build()) { + writer.add(record); + } + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .project(readerSchema) + .createReaderFunc(DataReader::create) + .build()) { + rows = Lists.newArrayList(reader); + } - @Override - public Object initialDefault() { - return initialDefault; + // Existence of default value should not affect the read result + DataTestHelpers.assertEquals(readerSchema.asStruct(), record, rows.get(0)); } } } From c9f9c27a41aef574349aa18dde097da7a2573f26 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Mon, 15 May 2023 21:26:08 -0700 Subject: [PATCH 08/11] Address review comments --- .../org/apache/iceberg/avro/ValueReaders.java | 45 +++++++++---------- .../data/IdentityPartitionConverters.java | 3 +- .../apache/iceberg/data/avro/DataReader.java | 9 ++-- .../iceberg/data/avro/GenericReaders.java | 13 ++---- .../data/avro/TestReadDefaultValues.java | 33 +++++++------- 5 files changed, 46 insertions(+), 57 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 6d559c370c1d..ab3c07326e83 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -27,8 +27,6 @@ import java.nio.ByteOrder; import java.util.*; import java.util.function.Supplier; -import java.util.stream.Collectors; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.IndexedRecord; @@ -563,9 +561,9 @@ public Map read(Decoder decoder, Object reuse) throws IOException { public abstract static class StructReader implements ValueReader, SupportsRowPosition { private final ValueReader[] readers; - private final int[] constantValuesPositions; + private final int[] constantPositions; private final Object[] constantValues; - private final int[] defaultValuesPositions; + private final int[] defaultPositions; private final Object[] defaultValues; private int posField = -1; @@ -585,13 +583,13 @@ protected StructReader(List> readers, Schema schema) { } if (isDeletedColumnPos == null) { - this.constantValuesPositions = new int[0]; + this.constantPositions = new int[0]; this.constantValues = new Object[0]; } else { - this.constantValuesPositions = new int[] {isDeletedColumnPos}; + this.constantPositions = new int[] {isDeletedColumnPos}; this.constantValues = new Object[] {false}; } - this.defaultValuesPositions = new int[0]; + this.defaultPositions = new int[0]; this.defaultValues = new Object[0]; } @@ -600,34 +598,34 @@ protected StructReader( this.readers = readers.toArray(new ValueReader[0]); List fields = struct.fields(); - List constantValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); + List constantPositionsList = Lists.newArrayListWithCapacity(fields.size()); List constantValuesList = Lists.newArrayListWithCapacity(fields.size()); List defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); List defaultValuesList = Lists.newArrayListWithCapacity(fields.size()); for (int pos = 0; pos < fields.size(); pos += 1) { Types.NestedField field = fields.get(pos); if (idToConstant.containsKey(field.fieldId())) { - constantValuesPositionList.add(pos); + constantPositionsList.add(pos); constantValuesList.add(idToConstant.get(field.fieldId())); } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) { // track where the _pos field is located for setRowPositionSupplier this.posField = pos; } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { - constantValuesPositionList.add(pos); + constantPositionsList.add(pos); constantValuesList.add(false); } else if (field.initialDefault() != null) { // Add a constant value for fields that have a default value. // In the {@link #read()} method, this will be leveraged only if there is no corresponding // reader. defaultValuesPositionList.add(pos); - defaultValuesList.add(IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault())); + defaultValuesList.add( + IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault())); } } - this.constantValuesPositions = - constantValuesPositionList.stream().mapToInt(Integer::intValue).toArray(); + this.constantPositions = constantPositionsList.stream().mapToInt(Integer::intValue).toArray(); this.constantValues = constantValuesList.toArray(); - this.defaultValuesPositions = + this.defaultPositions = defaultValuesPositionList.stream().mapToInt(Integer::intValue).toArray(); this.defaultValues = defaultValuesList.toArray(); } @@ -667,17 +665,18 @@ public S read(Decoder decoder, Object reuse) throws IOException { S struct = reuseOrCreate(reuse); if (decoder instanceof ResolvingDecoder) { - Set existingFieldPositionsSet = Arrays.stream(((ResolvingDecoder) decoder).readFieldOrder()).map(Schema.Field::pos).collect(Collectors.toSet()); + Set existingFieldPositionsSet = new HashSet<>(); // This may not set all of the fields. nulls are set by default. - for (int i = 0; i < existingFieldPositionsSet.size(); i++) { - Object reusedValue = get(struct, i); - set(struct, i, readers[i].read(decoder, reusedValue)); + for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { + Object reusedValue = get(struct, field.pos()); + set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); + existingFieldPositionsSet.add(field.pos()); } // Set default values - for (int i = 0; i < defaultValuesPositions.length; i += 1) { + for (int i = 0; i < defaultPositions.length; i += 1) { // Set default values only if the field does not exist in the data. - if (!existingFieldPositionsSet.contains(defaultValuesPositions[i])) { - set(struct, defaultValuesPositions[i], defaultValues[i]); + if (!existingFieldPositionsSet.contains(defaultPositions[i])) { + set(struct, defaultPositions[i], defaultValues[i]); } } } else { @@ -687,8 +686,8 @@ public S read(Decoder decoder, Object reuse) throws IOException { } } - for (int i = 0; i < constantValuesPositions.length; i += 1) { - set(struct, constantValuesPositions[i], constantValues[i]); + for (int i = 0; i < constantPositions.length; i += 1) { + set(struct, constantPositions[i], constantValues[i]); } return struct; diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java index 1391a7b0f9b8..c19d75f0003f 100644 --- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java +++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java @@ -18,14 +18,13 @@ */ package org.apache.iceberg.data; +import java.nio.ByteBuffer; import org.apache.avro.generic.GenericData; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; -import java.nio.ByteBuffer; - public class IdentityPartitionConverters { private IdentityPartitionConverters() {} diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 2f12fae613b6..1cc901d15bc1 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -80,11 +80,8 @@ public void setRowPositionSupplier(Supplier posSupplier) { } protected ValueReader createStructReader( - Types.StructType struct, - Schema record, - List> fields, - Map idToConstant) { - return GenericReaders.struct(struct, record, fields, idToConstant); + Types.StructType struct, List> fields, Map idToConstant) { + return GenericReaders.struct(struct, fields, idToConstant); } private class ReadBuilder extends AvroSchemaWithTypeVisitor> { @@ -97,7 +94,7 @@ private ReadBuilder(Map idToConstant) { @Override public ValueReader record( Types.StructType struct, Schema record, List names, List> fields) { - return createStructReader(struct, record, fields, idToConstant); + return createStructReader(struct, fields, idToConstant); } @Override diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 631c50486564..91a728d53d38 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -25,7 +25,6 @@ import java.time.OffsetDateTime; import java.util.List; import java.util.Map; -import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; @@ -54,11 +53,8 @@ static ValueReader timestamptz() { } static ValueReader struct( - StructType struct, - Schema record, - List> readers, - Map idToConstant) { - return new GenericRecordReader(readers, struct, record, idToConstant); + StructType struct, List> readers, Map idToConstant) { + return new GenericRecordReader(readers, struct, idToConstant); } private static class DateReader implements ValueReader { @@ -109,10 +105,7 @@ private static class GenericRecordReader extends ValueReaders.StructReader> readers, - StructType struct, - Schema record, - Map idToConstant) { + List> readers, StructType struct, Map idToConstant) { super(readers, struct, idToConstant); this.structType = struct; } diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java index c06e8dae0bb6..8b7cee9bcba6 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java @@ -29,7 +29,10 @@ import org.apache.iceberg.SingleValueParser; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroIterable; -import org.apache.iceberg.data.*; +import org.apache.iceberg.data.DataTestHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IdentityPartitionConverters; +import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -122,8 +125,7 @@ public void testDefaultValueApplied() throws IOException { Schema readerSchema = new Schema( required(999, "col1", Types.IntegerType.get()), - Types.NestedField.optional( - 1000, "col2", type, null, defaultValue, defaultValue)); + Types.NestedField.optional(1000, "col2", type, null, defaultValue, defaultValue)); Record expectedRecord = GenericRecord.create(readerSchema); expectedRecord.set(0, 1); @@ -164,10 +166,9 @@ public void testDefaultValueNotApplied() throws IOException { Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); Schema readerSchema = - new Schema( - required(999, "col1", Types.IntegerType.get()), - Types.NestedField.optional( - 1000, "col2", type, null, defaultValue, defaultValue)); + new Schema( + required(999, "col1", Types.IntegerType.get()), + Types.NestedField.optional(1000, "col2", type, null, defaultValue, defaultValue)); // Create a record with null value for the column with default value Record record = GenericRecord.create(readerSchema); @@ -178,20 +179,20 @@ public void testDefaultValueNotApplied() throws IOException { Assert.assertTrue("Delete should succeed", testFile.delete()); try (FileAppender writer = - Avro.write(Files.localOutput(testFile)) - .schema(readerSchema) - .createWriterFunc(DataWriter::create) - .named("test") - .build()) { + Avro.write(Files.localOutput(testFile)) + .schema(readerSchema) + .createWriterFunc(DataWriter::create) + .named("test") + .build()) { writer.add(record); } List rows; try (AvroIterable reader = - Avro.read(Files.localInput(testFile)) - .project(readerSchema) - .createReaderFunc(DataReader::create) - .build()) { + Avro.read(Files.localInput(testFile)) + .project(readerSchema) + .createReaderFunc(DataReader::create) + .build()) { rows = Lists.newArrayList(reader); } From 0629c8aa79e7b078fd56f8e607fea5913f90da2c Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Mon, 15 May 2023 22:26:14 -0700 Subject: [PATCH 09/11] Fix revapi and style check --- .palantir/revapi.yml | 11 ++++------- .../java/org/apache/iceberg/avro/ValueReaders.java | 12 ++++++++++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index fde647c85be3..6c9659fdcd34 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -406,6 +406,10 @@ acceptedBreaks: justification: "Removing deprecations for 1.2.0" "1.2.0": org.apache.iceberg:iceberg-api: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.types.Types.NestedField" + new: "class org.apache.iceberg.types.Types.NestedField" + justification: "Serialization across versions is not supported" - code: "java.field.constantValueChanged" old: "field org.apache.iceberg.actions.RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT" new: "field org.apache.iceberg.actions.RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT" @@ -735,10 +739,6 @@ acceptedBreaks: old: "class org.apache.iceberg.PartitionKey" new: "class org.apache.iceberg.PartitionKey" justification: "Serialization across versions is not supported" - - code: "java.class.defaultSerializationChanged" - old: "class org.apache.iceberg.types.Types.NestedField" - new: "class org.apache.iceberg.types.Types.NestedField" - justification: "Serialization across versions is not supported" - code: "java.class.removed" old: "interface org.apache.iceberg.Rollback" justification: "Deprecations for 1.0 release" @@ -748,9 +748,6 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method java.util.List org.apache.iceberg.Table::statisticsFiles()" justification: "new API method" - - code: "java.method.addedToInterface" - new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)" - justification: "Adding table scan APIs to support scanning from refs" - code: "java.method.removed" old: "method java.lang.Iterable org.apache.iceberg.Snapshot::addedFiles()" justification: "Deprecations for 1.0 release" diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index ab3c07326e83..85b3092dde8e 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -25,7 +25,14 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.*; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.function.Supplier; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -38,6 +45,7 @@ import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; @@ -665,7 +673,7 @@ public S read(Decoder decoder, Object reuse) throws IOException { S struct = reuseOrCreate(reuse); if (decoder instanceof ResolvingDecoder) { - Set existingFieldPositionsSet = new HashSet<>(); + Set existingFieldPositionsSet = Sets.newHashSet(); // This may not set all of the fields. nulls are set by default. for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { Object reusedValue = get(struct, field.pos()); From 7ccf8685e08d308581f823832d29f7c5948948fc Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Mon, 7 Aug 2023 23:53:22 -0700 Subject: [PATCH 10/11] Address review comments --- .palantir/revapi.yml | 9 +++++ .../org/apache/iceberg/avro/ValueReaders.java | 35 +++++++++++++------ .../iceberg/data/avro/GenericReaders.java | 17 ++++++++- .../iceberg/flink/data/FlinkValueReaders.java | 4 ++- .../iceberg/spark/data/SparkValueReaders.java | 4 ++- 5 files changed, 55 insertions(+), 14 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 6c9659fdcd34..7f541f658f7b 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -451,6 +451,15 @@ acceptedBreaks: - code: "java.field.removedWithConstant" old: "field org.apache.iceberg.TableProperties.HMS_TABLE_OWNER" justification: "Removing deprecations for 1.3.0" + - code: "java.method.numberOfParametersChanged" + old: "method void org.apache.iceberg.avro.ValueReaders.StructReader::(java.util.List>,\ + \ org.apache.iceberg.types.Types.StructType, java.util.Map)" + new: "method void org.apache.iceberg.avro.ValueReaders.StructReader::(java.util.List>,\ + \ org.apache.iceberg.types.Types.StructType, java.util.Map, java.util.Map)" + justification: "Added idToDefault parameter to accommodate passing the default\ + \ map from the specific reader" - code: "java.method.parameterTypeChanged" old: "parameter void org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService::offer(===org.apache.iceberg.actions.RewriteFileGroup===)" new: "parameter void org.apache.iceberg.actions.BaseCommitService::offer(===T===)\ diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 85b3092dde8e..51a2214bd09c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -42,7 +42,6 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -602,13 +601,16 @@ protected StructReader(List> readers, Schema schema) { } protected StructReader( - List> readers, Types.StructType struct, Map idToConstant) { + List> readers, + Types.StructType struct, + Map idToConstant, + Map idToDefault) { this.readers = readers.toArray(new ValueReader[0]); List fields = struct.fields(); List constantPositionsList = Lists.newArrayListWithCapacity(fields.size()); List constantValuesList = Lists.newArrayListWithCapacity(fields.size()); - List defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); + List defaultPositionList = Lists.newArrayListWithCapacity(fields.size()); List defaultValuesList = Lists.newArrayListWithCapacity(fields.size()); for (int pos = 0; pos < fields.size(); pos += 1) { Types.NestedField field = fields.get(pos); @@ -622,19 +624,28 @@ protected StructReader( constantPositionsList.add(pos); constantValuesList.add(false); } else if (field.initialDefault() != null) { - // Add a constant value for fields that have a default value. - // In the {@link #read()} method, this will be leveraged only if there is no corresponding - // reader. - defaultValuesPositionList.add(pos); - defaultValuesList.add( - IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault())); + if (idToDefault.containsKey(field.fieldId())) { + // Add a constant value for fields that have a default value. + // In the {@link #read()} method, this will be leveraged only if there is no + // corresponding + // reader. + defaultPositionList.add(pos); + defaultValuesList.add(idToDefault.get(field.fieldId())); + } else { + // Throw an exception if the map does not contain a default value for that field. + throw new UnsupportedOperationException( + "Default value not found for field: " + + field.name() + + " (field ID: " + + field.fieldId() + + ")."); + } } } this.constantPositions = constantPositionsList.stream().mapToInt(Integer::intValue).toArray(); this.constantValues = constantValuesList.toArray(); - this.defaultPositions = - defaultValuesPositionList.stream().mapToInt(Integer::intValue).toArray(); + this.defaultPositions = defaultPositionList.stream().mapToInt(Integer::intValue).toArray(); this.defaultValues = defaultValuesList.toArray(); } @@ -680,6 +691,7 @@ public S read(Decoder decoder, Object reuse) throws IOException { set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); existingFieldPositionsSet.add(field.pos()); } + // Set default values for (int i = 0; i < defaultPositions.length; i += 1) { // Set default values only if the field does not exist in the data. @@ -687,6 +699,7 @@ public S read(Decoder decoder, Object reuse) throws IOException { set(struct, defaultPositions[i], defaultValues[i]); } } + } else { for (int i = 0; i < readers.length; i += 1) { Object reusedValue = get(struct, i); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 91a728d53d38..d77a9de50f60 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -29,7 +29,10 @@ import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.DateTimeUtil; @@ -106,7 +109,7 @@ private static class GenericRecordReader extends ValueReaders.StructReader> readers, StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + super(readers, struct, idToConstant, idToDefault(struct)); this.structType = struct; } @@ -128,5 +131,17 @@ protected Object get(Record struct, int pos) { protected void set(Record struct, int pos, Object value) { struct.set(pos, value); } + + private static Map idToDefault(StructType struct) { + Map result = Maps.newHashMap(); + for (Types.NestedField field : struct.fields()) { + if (field.initialDefault() != null) { + result.put( + field.fieldId(), + IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault())); + } + } + return result; + } } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 32f6c3a2ccfd..3e3be161b485 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -37,6 +37,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -287,7 +288,8 @@ private static class StructReader extends ValueReaders.StructReader { private StructReader( List> readers, Types.StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + // TODO: Support passing default value map. + super(readers, struct, idToConstant, ImmutableMap.of()); this.numFields = readers.size(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 3cbf38d88bf4..0a95e168f353 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -30,6 +30,7 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; @@ -254,7 +255,8 @@ static class StructReader extends ValueReaders.StructReader { protected StructReader( List> readers, Types.StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + // TODO: Support passing default value map + super(readers, struct, idToConstant, ImmutableMap.of()); this.numFields = readers.size(); } From 057366bf0d6a8807c56b71dff62a64f8315c1157 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Sun, 8 Oct 2023 11:57:03 -0700 Subject: [PATCH 11/11] Update StructReader constructor --- .palantir/revapi.yml | 9 --------- .../main/java/org/apache/iceberg/avro/ValueReaders.java | 6 ++++++ .../org/apache/iceberg/flink/data/FlinkValueReaders.java | 3 +-- .../org/apache/iceberg/spark/data/SparkValueReaders.java | 3 +-- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 7f541f658f7b..6c9659fdcd34 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -451,15 +451,6 @@ acceptedBreaks: - code: "java.field.removedWithConstant" old: "field org.apache.iceberg.TableProperties.HMS_TABLE_OWNER" justification: "Removing deprecations for 1.3.0" - - code: "java.method.numberOfParametersChanged" - old: "method void org.apache.iceberg.avro.ValueReaders.StructReader::(java.util.List>,\ - \ org.apache.iceberg.types.Types.StructType, java.util.Map)" - new: "method void org.apache.iceberg.avro.ValueReaders.StructReader::(java.util.List>,\ - \ org.apache.iceberg.types.Types.StructType, java.util.Map, java.util.Map)" - justification: "Added idToDefault parameter to accommodate passing the default\ - \ map from the specific reader" - code: "java.method.parameterTypeChanged" old: "parameter void org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService::offer(===org.apache.iceberg.actions.RewriteFileGroup===)" new: "parameter void org.apache.iceberg.actions.BaseCommitService::offer(===T===)\ diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 51a2214bd09c..65419b3dcc9b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -42,6 +42,7 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -600,6 +601,11 @@ protected StructReader(List> readers, Schema schema) { this.defaultValues = new Object[0]; } + protected StructReader( + List> readers, Types.StructType struct, Map idToConstant) { + this(readers, struct, idToConstant, ImmutableMap.of()); + } + protected StructReader( List> readers, Types.StructType struct, diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 3e3be161b485..272813e937ba 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -37,7 +37,6 @@ import org.apache.flink.table.data.TimestampData; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -289,7 +288,7 @@ private static class StructReader extends ValueReaders.StructReader { private StructReader( List> readers, Types.StructType struct, Map idToConstant) { // TODO: Support passing default value map. - super(readers, struct, idToConstant, ImmutableMap.of()); + super(readers, struct, idToConstant); this.numFields = readers.size(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 0a95e168f353..13c2e11e39eb 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -30,7 +30,6 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; @@ -256,7 +255,7 @@ static class StructReader extends ValueReaders.StructReader { protected StructReader( List> readers, Types.StructType struct, Map idToConstant) { // TODO: Support passing default value map - super(readers, struct, idToConstant, ImmutableMap.of()); + super(readers, struct, idToConstant); this.numFields = readers.size(); }