From 167c9bba55fd41ff5bc06d7f1ad10441665aed6c Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Wed, 12 Mar 2025 14:31:13 -0700 Subject: [PATCH 01/10] Add variant array reader in Parquet --- .../iceberg/variants/ShreddedArray.java | 55 +++ .../org/apache/iceberg/variants/Variants.java | 4 + .../parquet/ParquetVariantReaders.java | 100 ++++++ .../iceberg/parquet/VariantReaderBuilder.java | 9 +- .../iceberg/parquet/TestVariantReaders.java | 317 +++++++++++++++++- 5 files changed, 479 insertions(+), 6 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java b/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java new file mode 100644 index 000000000000..35c2984ea58b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class ShreddedArray implements VariantArray { + private List elements; + + ShreddedArray() { + this.elements = Lists.newArrayList(); + } + + @Override + public VariantValue get(int index) { + return elements.get(index); + } + + @Override + public int numElements() { + return elements.size(); + } + + public void add(VariantValue value) { + elements.add(value); + } + + @Override + public int sizeInBytes() { + throw new UnsupportedOperationException("sizeInBytes in ShreddedArray is not supported yet"); + } + + @Override + public int writeTo(ByteBuffer buffer, int offset) { + throw new UnsupportedOperationException("writeTo in ShreddedArray is not supported yet"); + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java index d5f8cb4ae67c..ffeb60d4a2f1 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variants.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -121,6 +121,10 @@ public static boolean isNull(ByteBuffer valueBuffer) { return VariantUtil.readByte(valueBuffer, 0) == 0; } + public static ShreddedArray array() { + return new ShreddedArray(); + } + public static VariantPrimitive of(PhysicalType type, T value) { return new PrimitiveWrapper<>(type, value); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 3e5635958c0a..f42bd8a16954 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -30,6 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.ShreddedArray; import org.apache.iceberg.variants.ShreddedObject; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantMetadata; @@ -95,6 +96,20 @@ public static VariantValueReader objects( fieldReaders); } + public static VariantValueReader array( + int valueDefinitionLevel, + ParquetValueReader valueReader, + int typedDefinitionLevel, + int typedRepetitionLevel, + VariantValueReader elementReader) { + return new ShreddedArrayReader( + valueDefinitionLevel, + (VariantValueReader) valueReader, + typedDefinitionLevel, + typedRepetitionLevel, + elementReader); + } + public static VariantValueReader asVariant(PhysicalType type, ParquetValueReader reader) { return new ValueAsVariantReader<>(type, reader); } @@ -332,6 +347,91 @@ public void setPageSource(PageReadStore pageStore) { } } + private static class ShreddedArrayReader implements VariantValueReader { + private final int valueDL; + private final VariantValueReader valueReader; + private final int repeatedDL; + private final int repeatedRL; + private final VariantValueReader elementReader; + private final TripleIterator valueColumn; + private final TripleIterator elementColumn; + private final List> children; + + private ShreddedArrayReader( + int valueDL, + VariantValueReader valueReader, + int typedDL, + int typedRL, + VariantValueReader elementReader) { + this.valueDL = valueDL; + this.valueReader = valueReader; + this.repeatedDL = typedDL + 1; + this.repeatedRL = typedRL + 1; + this.elementReader = elementReader; + this.elementColumn = this.elementReader.column(); + this.valueColumn = valueReader != null ? valueReader.column() : elementColumn; + this.children = + children(Iterables.concat(Arrays.asList(valueReader), Arrays.asList(elementReader))); + } + + @Override + public VariantValue read(VariantMetadata metadata) { + // if the current definition level is less to the definition level of the repeated + // type, i.e. typed_value is null, then it's not an array + boolean isArray = elementColumn.currentDefinitionLevel() >= repeatedDL; + VariantValue value = ParquetVariantReaders.read(metadata, valueReader, valueDL); + + if (isArray) { + Preconditions.checkArgument( + value == MISSING, "Invalid variant, non-array value: %s", value); + + // if the current definition level is equal to the definition level of this repeated + // type, then it's an empty list and the repetition level will always be <= rl. + ShreddedArray arr = Variants.array(); + do { + if (elementColumn.currentDefinitionLevel() > repeatedDL) { + VariantValue elementValue = elementReader.read(metadata); + arr.add(elementValue); + } else { + // consume the empty list triple + for (TripleIterator child : elementReader.columns()) { + child.nextNull(); + } + break; + } + } while (elementColumn.currentRepetitionLevel() > repeatedRL); + + return arr; + } + + // for non-arrays, advance the element iterators + for (TripleIterator child : elementReader.columns()) { + child.nextNull(); + } + + return value; + } + + @Override + public TripleIterator column() { + return valueColumn; + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setPageSource(PageReadStore pageStore) { + if (valueReader != null) { + valueReader.setPageSource(pageStore); + } + + elementReader.setPageSource(pageStore); + } + } + private static class VariantReader implements ParquetValueReader { private final ParquetValueReader metadataReader; private final VariantValueReader valueReader; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java index df41c5aa6067..062c89f35acf 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java @@ -162,8 +162,13 @@ public VariantValueReader object( @Override public VariantValueReader array( - GroupType array, ParquetValueReader valueResult, ParquetValueReader elementResult) { - throw new UnsupportedOperationException("Array is not yet supported"); + GroupType array, ParquetValueReader valueReader, ParquetValueReader elementResult) { + int valueDL = + valueReader != null ? schema.getMaxDefinitionLevel(path(VALUE)) - 1 : Integer.MAX_VALUE; + int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1; + int typedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE)) - 1; + return ParquetVariantReaders.array( + valueDL, valueReader, typedDL, typedRL, (VariantValueReader) elementResult); } private static class LogicalTypeToVariantReader diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index b0299762f7a2..ef5917111ad5 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -47,6 +47,7 @@ import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.VariantType; import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.ShreddedArray; import org.apache.iceberg.variants.ShreddedObject; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantMetadata; @@ -57,6 +58,8 @@ import org.apache.iceberg.variants.Variants; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.conf.ParquetConfiguration; +import org.apache.parquet.conf.PlainParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.schema.GroupType; @@ -135,6 +138,15 @@ public class TestVariantReaders { Variants.ofUUID("f24f9b64-81fa-49d1-b74e-8c09a6e31c56"), }; + // Required configuration to convert between Avro and Parquet schemas with 3-level list structure + private static final ParquetConfiguration CONF = + new PlainParquetConfiguration( + Map.of( + AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, + "false", + AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, + "false")); + private static Stream metadataAndValues() { Stream primitives = Stream.of(PRIMITIVES).map(variant -> Arguments.of(EMPTY_METADATA, variant)); @@ -255,7 +267,7 @@ public void testMissingValueColumn() throws IOException { } @Test - public void testValueAndTypedValueConflict() throws IOException { + public void testValueAndTypedValueConflict() { GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.INT32)); MessageType parquetSchema = parquetSchema(variantType); @@ -885,6 +897,270 @@ public void testMixedRecords() throws IOException { VariantTestUtil.assertEqual(expectedThree, actualThreeVariant.value()); } + @Test + public void testShreddedArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType variantType = variant("var", 2, list(shreddedType)); + MessageType parquetSchema = parquetSchema(variantType); + + List arr = elements(shreddedType, List.of("comedy", "drama")); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + Record actual = writeAndRead(parquetSchema, row); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + ShreddedArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testShreddedNullArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType variantType = variant("var", 2, list(shreddedType)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord var = + record( + variantType, + Map.of( + "metadata", + VariantTestUtil.emptyMetadata(), + "value", + serialize(Variants.ofNull()))); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.ofNull(), actualVariant.value()); + } + + @Test + public void testShreddedEmptyArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType variantType = variant("var", 2, list(shreddedType)); + MessageType parquetSchema = parquetSchema(variantType); + + List arr = List.of(); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + Record actual = writeAndRead(parquetSchema, row); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); + assertThat(actualVariant.value().asArray().numElements()).isEqualTo(0); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + } + + @Test + public void testShreddedArrayWithNull() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType variantType = variant("var", 2, list(shreddedType)); + MessageType parquetSchema = parquetSchema(variantType); + + List arr = elements(shreddedType, Lists.newArrayList("comedy", null, "drama")); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); + assertThat(actualVariant.value().asArray().numElements()).isEqualTo(3); + ShreddedArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.ofNull()); + expectedArray.add(Variants.of("drama")); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testShreddedArrayWithNestedArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType innerListType = list(shreddedType); + GroupType variantType = variant("var", 2, list(innerListType)); + MessageType parquetSchema = parquetSchema(variantType); + + List inner1 = elements(shreddedType, List.of("comedy", "drama")); + List inner2 = elements(shreddedType, List.of()); + List outer1 = elements(innerListType, List.of(inner1, inner2)); + GenericRecord var = + record( + variantType, + Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", outer1)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + Record actual = writeAndRead(parquetSchema, row); + + // Verify + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + ShreddedArray expectedArray = Variants.array(); + ShreddedArray expectedInner1 = Variants.array(); + expectedInner1.add(Variants.of("comedy")); + expectedInner1.add(Variants.of("drama")); + ShreddedArray expectedInner2 = Variants.array(); + expectedArray.add(expectedInner1); + expectedArray.add(expectedInner2); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testShreddedArrayWithNestedObject() throws IOException { + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); + GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); + GroupType shreddedFields = objectFields(fieldA, fieldB); + GroupType listType = list(shreddedFields); + GroupType fieldC = field("c", listType); + GroupType objectFields = objectFields(fieldC); + GroupType variantType = variant("var", 2, objectFields); + MessageType parquetSchema = parquetSchema(variantType); + + // Row 1 + GenericRecord a1 = record(fieldA, Map.of("typed_value", 1)); + GenericRecord b1 = record(fieldB, Map.of("typed_value", "comedy")); + GenericRecord shredded1 = record(shreddedFields, Map.of("a", a1, "b", b1)); + GenericRecord a2 = record(fieldA, Map.of("typed_value", 2)); + GenericRecord b2 = record(fieldB, Map.of("typed_value", "drama")); + GenericRecord shredded2 = record(shreddedFields, Map.of("a", a2, "b", b2)); + List arr1 = elements(shreddedFields, List.of(shredded1, shredded2)); + GenericRecord element1 = record(fieldC, Map.of("typed_value", arr1)); + GenericRecord c1 = record(objectFields, Map.of("c", element1)); + GenericRecord var1 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", c1)); + GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); + + // Row 2 + GenericRecord a3 = record(fieldA, Map.of("typed_value", 3)); + GenericRecord b3 = record(fieldB, Map.of("typed_value", "action")); + GenericRecord shredded3 = record(shreddedFields, Map.of("a", a3, "b", b3)); + GenericRecord a4 = record(fieldA, Map.of("typed_value", 4)); + GenericRecord b4 = record(fieldB, Map.of("typed_value", "horror")); + GenericRecord shredded4 = record(shreddedFields, Map.of("a", a4, "b", b4)); + List arr2 = elements(shreddedFields, List.of(shredded3, shredded4)); + GenericRecord element2 = record(fieldC, Map.of("typed_value", arr2)); + GenericRecord c2 = record(objectFields, Map.of("c", element2)); + GenericRecord var2 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", c2)); + GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); + + // verify + List actual = writeAndRead(parquetSchema, List.of(row1, row2)); + Record actual1 = actual.get(0); + assertThat(actual1.getField("id")).isEqualTo(1); + assertThat(actual1.getField("var")).isInstanceOf(Variant.class); + + ShreddedObject expected1 = Variants.object(TEST_METADATA); + ShreddedArray expectedArray1 = Variants.array(); + ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); + expectedElement1.put("a", Variants.of(1)); + expectedElement1.put("b", Variants.of("comedy")); + expectedArray1.add(expectedElement1); + ShreddedObject expectedElement2 = Variants.object(TEST_METADATA); + expectedElement2.put("a", Variants.of(2)); + expectedElement2.put("b", Variants.of("drama")); + expectedArray1.add(expectedElement2); + expected1.put("c", expectedArray1); + + Variant actualVariant1 = (Variant) actual1.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant1.metadata()); + VariantTestUtil.assertEqual(expected1, actualVariant1.value()); + + Record actual2 = actual.get(1); + assertThat(actual2.getField("id")).isEqualTo(2); + assertThat(actual2.getField("var")).isInstanceOf(Variant.class); + + ShreddedObject expected2 = Variants.object(TEST_METADATA); + ShreddedArray expectedArray2 = Variants.array(); + ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); + expectedElement3.put("a", Variants.of(3)); + expectedElement3.put("b", Variants.of("action")); + expectedArray2.add(expectedElement3); + ShreddedObject expectedElement4 = Variants.object(TEST_METADATA); + expectedElement4.put("a", Variants.of(4)); + expectedElement4.put("b", Variants.of("horror")); + expectedArray2.add(expectedElement4); + expected2.put("c", expectedArray2); + + Variant actualVariant2 = (Variant) actual2.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant2.metadata()); + VariantTestUtil.assertEqual(expected2, actualVariant2.value()); + } + + @Test + public void testShreddedArrayWithNonArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType variantType = variant("var", 2, list(shreddedType)); + MessageType parquetSchema = parquetSchema(variantType); + + List arr1 = elements(shreddedType, List.of("comedy", "drama")); + GenericRecord var1 = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr1)); + GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); + + GenericRecord var2 = + record( + variantType, + Map.of( + "metadata", VariantTestUtil.emptyMetadata(), "value", serialize(Variants.of(34)))); + GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); + + GenericRecord var3 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "value", TEST_OBJECT_BUFFER)); + GenericRecord row3 = record(parquetSchema, Map.of("id", 3, "var", var3)); + + List actual = writeAndRead(parquetSchema, List.of(row1, row2, row3)); + + // Verify + Record actual1 = actual.get(0); + assertThat(actual1.getField("id")).isEqualTo(1); + assertThat(actual1.getField("var")).isInstanceOf(Variant.class); + ShreddedArray expectedArray1 = Variants.array(); + expectedArray1.add(Variants.of("comedy")); + expectedArray1.add(Variants.of("drama")); + Variant actualVariant1 = (Variant) actual1.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant1.metadata()); + + Record actual2 = actual.get(1); + assertThat(actual2.getField("id")).isEqualTo(2); + assertThat(actual2.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant2 = (Variant) actual2.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant2.metadata()); + VariantTestUtil.assertEqual(Variants.of(PhysicalType.INT32, 34), actualVariant2.value()); + + Record actual3 = actual.get(2); + assertThat(actual3.getField("id")).isEqualTo(3); + assertThat(actual3.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant3 = (Variant) actual3.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant3.metadata()); + ShreddedObject expected = Variants.object(TEST_METADATA); + expected.put("a", Variants.ofNull()); + expected.put("d", Variants.of("iceberg")); + VariantTestUtil.assertEqual(expected, actualVariant3.value()); + } + private static ByteBuffer serialize(VariantValue value) { ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); value.writeTo(buffer, 0); @@ -906,6 +1182,31 @@ private static GenericRecord record(GroupType type, Map fields) return record; } + private static List elements(Type shreddedType, List elements) { + GroupType elementType = + Types.buildGroup(Type.Repetition.REQUIRED) + .addField( + Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL).named("value")) + .addField(shreddedType) + .named("element"); + org.apache.avro.Schema elementSchema = avroSchema(elementType); + + List elementRecords = Lists.newArrayList(); + if (elements != null) { + for (T element : elements) { + GenericRecord elementRecord = new GenericData.Record(elementSchema); + if (element != null) { + elementRecord.put("typed_value", element); + } else { + elementRecord.put("value", serialize(Variants.ofNull())); + } + elementRecords.add(elementRecord); + } + } + + return elementRecords; + } + /** * This is a custom Parquet writer builder that injects a specific Parquet schema and then uses * the Avro object model. This ensures that the Parquet file's schema is exactly what was passed. @@ -943,7 +1244,7 @@ static List writeAndRead(MessageType parquetSchema, List OutputFile outputFile = new InMemoryOutputFile(); try (ParquetWriter writer = - new TestWriterBuilder(outputFile).withFileType(parquetSchema).build()) { + new TestWriterBuilder(outputFile).withFileType(parquetSchema).withConf(CONF).build()) { for (GenericRecord record : records) { writer.write(record); } @@ -1104,14 +1405,22 @@ private static GroupType field(String name, Type shreddedType) { .named(name); } + private static GroupType list(Type shreddedType) { + return Types.optionalList() + .requiredGroupElement() + .addField(Types.optional(PrimitiveTypeName.BINARY).named("value")) + .addField(shreddedType) + .named("typed_value"); + } + private static org.apache.avro.Schema avroSchema(GroupType schema) { if (schema instanceof MessageType) { - return new AvroSchemaConverter().convert((MessageType) schema); + return new AvroSchemaConverter(CONF).convert((MessageType) schema); } else { MessageType wrapped = Types.buildMessage().addField(schema).named("table"); org.apache.avro.Schema avro = - new AvroSchemaConverter().convert(wrapped).getFields().get(0).schema(); + new AvroSchemaConverter(CONF).convert(wrapped).getFields().get(0).schema(); switch (avro.getType()) { case RECORD: return avro; From ae32c0b58a543ba8ec66e2ccecdc3ee3b1c168e8 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Thu, 13 Mar 2025 18:57:58 -0700 Subject: [PATCH 02/10] Implement ShreddedArray --- .../iceberg/variants/ShreddedArray.java | 87 +++++++- .../iceberg/variants/TestShreddedArray.java | 190 ++++++++++++++++++ 2 files changed, 271 insertions(+), 6 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/variants/TestShreddedArray.java diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java b/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java index 35c2984ea58b..cb250486d03a 100644 --- a/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java +++ b/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java @@ -19,15 +19,16 @@ package org.apache.iceberg.variants; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class ShreddedArray implements VariantArray { - private List elements; + private SerializationState serializationState = null; + private List elements = Lists.newArrayList(); - ShreddedArray() { - this.elements = Lists.newArrayList(); - } + ShreddedArray() {} @Override public VariantValue get(int index) { @@ -41,15 +42,89 @@ public int numElements() { public void add(VariantValue value) { elements.add(value); + this.serializationState = null; } @Override public int sizeInBytes() { - throw new UnsupportedOperationException("sizeInBytes in ShreddedArray is not supported yet"); + if (null == serializationState) { + this.serializationState = new SerializationState(elements); + } + + return serializationState.size(); } @Override public int writeTo(ByteBuffer buffer, int offset) { - throw new UnsupportedOperationException("writeTo in ShreddedArray is not supported yet"); + Preconditions.checkArgument( + buffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian"); + + if (null == serializationState) { + this.serializationState = new SerializationState(elements); + } + + return serializationState.writeTo(buffer, offset); + } + + /** Common state for {@link #size()} and {@link #writeTo(ByteBuffer, int)} */ + private static class SerializationState { + private final List elements; + private final int numElements; + private final boolean isLarge; + private final int dataSize; + private final int offsetSize; + + private SerializationState(List elements) { + this.elements = elements; + this.numElements = elements.size(); + this.isLarge = numElements > 0xFF; + + int totalDataSize = 0; + for (VariantValue value : elements) { + totalDataSize += value.sizeInBytes(); + } + + this.dataSize = totalDataSize; + this.offsetSize = VariantUtil.sizeOf(totalDataSize); + } + + private int size() { + return 1 /* header */ + + (isLarge ? 4 : 1) /* num elements size */ + + (1 + numElements) * offsetSize /* offset list size */ + + dataSize; + } + + private int writeTo(ByteBuffer buffer, int offset) { + int offsetListOffset = + offset + 1 /* header size */ + (isLarge ? 4 : 1) /* num elements size */; + int dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); + byte header = VariantUtil.arrayHeader(isLarge, offsetSize); + + VariantUtil.writeByte(buffer, header, offset); + VariantUtil.writeLittleEndianUnsigned(buffer, numElements, offset + 1, isLarge ? 4 : 1); + + // Insert element offsets + int nextValueOffset = 0; + int index = 0; + for (VariantValue element : elements) { + // write the data offset + VariantUtil.writeLittleEndianUnsigned( + buffer, nextValueOffset, offsetListOffset + (index * offsetSize), offsetSize); + + // write the data + int valueSize = element.writeTo(buffer, dataOffset + nextValueOffset); + + nextValueOffset += valueSize; + index += 1; + } + + // write the final size of the data section + VariantUtil.writeLittleEndianUnsigned( + buffer, nextValueOffset, offsetListOffset + (index * offsetSize), offsetSize); + + // return the total size + return (dataOffset - offset) + dataSize; + } } } diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedArray.java b/core/src/test/java/org/apache/iceberg/variants/TestShreddedArray.java new file mode 100644 index 000000000000..163399c82d99 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedArray.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Random; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.RandomUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestShreddedArray { + private static final VariantMetadata EMPTY_METADATA = + Variants.metadata(VariantTestUtil.emptyMetadata()); + + private static final List ELEMENTS = + ImmutableList.of( + Variants.of(34), Variants.of("iceberg"), Variants.of(new BigDecimal("12.21"))); + + private final Random random = new Random(871925); + + @Test + public void testShreddedFields() { + ShreddedArray arr = createShreddedArray(ELEMENTS); + + assertThat(arr.get(0)).isInstanceOf(VariantPrimitive.class); + assertThat(arr.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(arr.get(1)).isInstanceOf(VariantPrimitive.class); + assertThat(arr.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(arr.get(2)).isInstanceOf(VariantPrimitive.class); + assertThat(arr.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testSerializationMinimalBuffer() { + ShreddedArray arr = createShreddedArray(ELEMENTS); + + VariantValue value = roundTripMinimalBuffer(arr); + + assertThat(value).isInstanceOf(SerializedArray.class); + SerializedArray actual = (SerializedArray) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get(0)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(actual.get(1)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get(2)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testSerializationLargeBuffer() { + ShreddedArray arr = createShreddedArray(ELEMENTS); + + VariantValue value = roundTripLargeBuffer(arr); + + assertThat(value).isInstanceOf(SerializedArray.class); + SerializedArray actual = (SerializedArray) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get(0)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(actual.get(1)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get(2)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testMultiByteElementSize() { + // Create large number of elements to use 4 bytes to store element size + List elements = Lists.newArrayList(); + for (int i = 0; i < 100_000; i += 1) { + elements.add(Variants.of(RandomUtil.generateString(10, random))); + } + + List data = Lists.newArrayList(); + data.addAll(elements); + + ShreddedArray shredded = createShreddedArray(data); + VariantValue value = roundTripLargeBuffer(shredded); + + assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); + SerializedArray arr = (SerializedArray) value; + assertThat(arr.numElements()).isEqualTo(100_000); + for (int i = 0; i < 100_000; i++) { + VariantTestUtil.assertEqual(arr.get(i), elements.get(i)); + } + } + + @ParameterizedTest + @ValueSource(ints = {300, 70_000, 16_777_300}) + public void testMultiByteOffsets(int len) { + // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, and 4 bytes + String randomString = RandomUtil.generateString(len, random); + SerializedPrimitive bigString = VariantTestUtil.createString(randomString); + + List data = Lists.newArrayList(); + data.addAll(ELEMENTS); + data.add(bigString); + + ShreddedArray shredded = createShreddedArray(data); + VariantValue value = roundTripLargeBuffer(shredded); + + assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); + SerializedArray arr = (SerializedArray) value; + assertThat(arr.numElements()).isEqualTo(4); + + assertThat(arr.get(0).type()).isEqualTo(PhysicalType.INT32); + assertThat(arr.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(arr.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(arr.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(arr.get(2).type()).isEqualTo(PhysicalType.DECIMAL4); + assertThat(arr.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + assertThat(arr.get(3).type()).isEqualTo(PhysicalType.STRING); + assertThat(arr.get(3).asPrimitive().get()).isEqualTo(randomString); + } + + @Test + public void testLargeObject() { + List> elements = Lists.newArrayList(); + for (int i = 0; i < 10_000; i += 1) { + elements.add(Variants.of(RandomUtil.generateString(10, random))); + } + + ShreddedArray shredded = createShreddedArray((List) elements); + VariantValue value = roundTripLargeBuffer(shredded); + + assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); + SerializedArray arr = (SerializedArray) value; + assertThat(arr.numElements()).isEqualTo(10_000); + + for (VariantPrimitive entry : elements) { + assertThat(entry.type()).isEqualTo(PhysicalType.STRING); + assertThat(entry.asPrimitive().get()).isEqualTo(entry.get()); + } + } + + private static VariantValue roundTripMinimalBuffer(ShreddedArray arr) { + ByteBuffer serialized = ByteBuffer.allocate(arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + arr.writeTo(serialized, 0); + + return Variants.value(EMPTY_METADATA, serialized); + } + + private static VariantValue roundTripLargeBuffer(ShreddedArray arr) { + ByteBuffer serialized = + ByteBuffer.allocate(1000 + arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + arr.writeTo(serialized, 300); + + ByteBuffer slice = serialized.duplicate().order(ByteOrder.LITTLE_ENDIAN); + slice.position(300); + slice.limit(300 + arr.sizeInBytes()); + + return Variants.value(EMPTY_METADATA, slice); + } + + private static ShreddedArray createShreddedArray(List elements) { + ShreddedArray arr = new ShreddedArray(); + for (VariantValue element : elements) { + arr.add(element); + } + + return arr; + } +} From 55f80faf62676b2aff65af6332c8fa4b9f5ca91b Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Tue, 18 Mar 2025 14:48:58 -0700 Subject: [PATCH 03/10] Update ArrayReader --- .../{ShreddedArray.java => ValueArray.java} | 4 +- .../org/apache/iceberg/variants/Variants.java | 4 +- .../iceberg/variants/TestShreddedObject.java | 65 +--------- ...ShreddedArray.java => TestValueArray.java} | 86 +++++-------- .../parquet/ParquetVariantReaders.java | 117 +++++++----------- .../iceberg/parquet/VariantReaderBuilder.java | 10 +- .../iceberg/parquet/TestVariantReaders.java | 32 ++--- 7 files changed, 107 insertions(+), 211 deletions(-) rename core/src/main/java/org/apache/iceberg/variants/{ShreddedArray.java => ValueArray.java} (98%) rename core/src/test/java/org/apache/iceberg/variants/{TestShreddedArray.java => TestValueArray.java} (64%) diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java similarity index 98% rename from core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java rename to core/src/main/java/org/apache/iceberg/variants/ValueArray.java index cb250486d03a..3da79bcef106 100644 --- a/core/src/main/java/org/apache/iceberg/variants/ShreddedArray.java +++ b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java @@ -24,11 +24,11 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -public class ShreddedArray implements VariantArray { +public class ValueArray implements VariantArray { private SerializationState serializationState = null; private List elements = Lists.newArrayList(); - ShreddedArray() {} + ValueArray() {} @Override public VariantValue get(int index) { diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java index ffeb60d4a2f1..5591145ca603 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variants.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -121,8 +121,8 @@ public static boolean isNull(ByteBuffer valueBuffer) { return VariantUtil.readByte(valueBuffer, 0) == 0; } - public static ShreddedArray array() { - return new ShreddedArray(); + public static ValueArray array() { + return new ValueArray(); } public static VariantPrimitive of(PhysicalType type, T value) { diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java index 6707ae6651a0..9d9df2bb87f7 100644 --- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java +++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java @@ -217,11 +217,12 @@ public void testPartiallyShreddedObjectSerializationLargeBuffer() { .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12")); } - @Test - public void testTwoByteOffsets() { - // a string larger than 255 bytes to push the value offset size above 1 byte - String randomString = RandomUtil.generateString(300, random); - SerializedPrimitive bigString = VariantTestUtil.createString(randomString); + @ParameterizedTest + @ValueSource(ints = {300, 70_000, 16_777_300}) + public void testMultiByteOffsets(int len) { + // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, and 4 bytes + String randomString = RandomUtil.generateString(len, random); + VariantPrimitive bigString = Variants.of(randomString); Map data = Maps.newHashMap(); data.putAll(FIELDS); @@ -244,60 +245,6 @@ public void testTwoByteOffsets() { assertThat(object.get("big").asPrimitive().get()).isEqualTo(randomString); } - @Test - public void testThreeByteOffsets() { - // a string larger than 65535 bytes to push the value offset size above 2 bytes - String randomString = RandomUtil.generateString(70_000, random); - SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); - - Map data = Maps.newHashMap(); - data.putAll(FIELDS); - data.put("really-big", reallyBigString); - - ShreddedObject shredded = createShreddedObject(data); - VariantValue value = roundTripLargeBuffer(shredded, shredded.metadata()); - - assertThat(value.type()).isEqualTo(PhysicalType.OBJECT); - SerializedObject object = (SerializedObject) value; - assertThat(object.numFields()).isEqualTo(4); - - assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT32); - assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); - assertThat(object.get("b").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); - assertThat(object.get("c").type()).isEqualTo(PhysicalType.DECIMAL4); - assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); - assertThat(object.get("really-big").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString); - } - - @Test - public void testFourByteOffsets() { - // a string larger than 16777215 bytes to push the value offset size above 3 bytes - String randomString = RandomUtil.generateString(16_777_300, random); - SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); - - Map data = Maps.newHashMap(); - data.putAll(FIELDS); - data.put("really-big", reallyBigString); - - ShreddedObject shredded = createShreddedObject(data); - VariantValue value = roundTripLargeBuffer(shredded, shredded.metadata()); - - assertThat(value.type()).isEqualTo(PhysicalType.OBJECT); - SerializedObject object = (SerializedObject) value; - assertThat(object.numFields()).isEqualTo(4); - - assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT32); - assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); - assertThat(object.get("b").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); - assertThat(object.get("c").type()).isEqualTo(PhysicalType.DECIMAL4); - assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); - assertThat(object.get("really-big").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedArray.java b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java similarity index 64% rename from core/src/test/java/org/apache/iceberg/variants/TestShreddedArray.java rename to core/src/test/java/org/apache/iceberg/variants/TestValueArray.java index 163399c82d99..6a2fec626b9f 100644 --- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedArray.java +++ b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java @@ -32,10 +32,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -public class TestShreddedArray { - private static final VariantMetadata EMPTY_METADATA = - Variants.metadata(VariantTestUtil.emptyMetadata()); - +public class TestValueArray { + private static final VariantMetadata EMPTY_METADATA = Variants.emptyMetadata(); private static final List ELEMENTS = ImmutableList.of( Variants.of(34), Variants.of("iceberg"), Variants.of(new BigDecimal("12.21"))); @@ -43,9 +41,10 @@ public class TestShreddedArray { private final Random random = new Random(871925); @Test - public void testShreddedFields() { - ShreddedArray arr = createShreddedArray(ELEMENTS); + public void testElementAccess() { + ValueArray arr = createArray(ELEMENTS); + assertThat(arr.numElements()).isEqualTo(3); assertThat(arr.get(0)).isInstanceOf(VariantPrimitive.class); assertThat(arr.get(0).asPrimitive().get()).isEqualTo(34); assertThat(arr.get(1)).isInstanceOf(VariantPrimitive.class); @@ -56,7 +55,7 @@ public void testShreddedFields() { @Test public void testSerializationMinimalBuffer() { - ShreddedArray arr = createShreddedArray(ELEMENTS); + ValueArray arr = createArray(ELEMENTS); VariantValue value = roundTripMinimalBuffer(arr); @@ -74,7 +73,7 @@ public void testSerializationMinimalBuffer() { @Test public void testSerializationLargeBuffer() { - ShreddedArray arr = createShreddedArray(ELEMENTS); + ValueArray arr = createArray(ELEMENTS); VariantValue value = roundTripLargeBuffer(arr); @@ -90,84 +89,61 @@ public void testSerializationLargeBuffer() { assertThat(actual.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); } - @Test - public void testMultiByteElementSize() { - // Create large number of elements to use 4 bytes to store element size - List elements = Lists.newArrayList(); - for (int i = 0; i < 100_000; i += 1) { - elements.add(Variants.of(RandomUtil.generateString(10, random))); - } - - List data = Lists.newArrayList(); - data.addAll(elements); - - ShreddedArray shredded = createShreddedArray(data); - VariantValue value = roundTripLargeBuffer(shredded); - - assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); - SerializedArray arr = (SerializedArray) value; - assertThat(arr.numElements()).isEqualTo(100_000); - for (int i = 0; i < 100_000; i++) { - VariantTestUtil.assertEqual(arr.get(i), elements.get(i)); - } - } - @ParameterizedTest @ValueSource(ints = {300, 70_000, 16_777_300}) public void testMultiByteOffsets(int len) { // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, and 4 bytes String randomString = RandomUtil.generateString(len, random); - SerializedPrimitive bigString = VariantTestUtil.createString(randomString); + VariantPrimitive bigString = Variants.of(randomString); List data = Lists.newArrayList(); data.addAll(ELEMENTS); data.add(bigString); - ShreddedArray shredded = createShreddedArray(data); + ValueArray shredded = createArray(data); VariantValue value = roundTripLargeBuffer(shredded); assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); - SerializedArray arr = (SerializedArray) value; - assertThat(arr.numElements()).isEqualTo(4); - - assertThat(arr.get(0).type()).isEqualTo(PhysicalType.INT32); - assertThat(arr.get(0).asPrimitive().get()).isEqualTo(34); - assertThat(arr.get(1).type()).isEqualTo(PhysicalType.STRING); - assertThat(arr.get(1).asPrimitive().get()).isEqualTo("iceberg"); - assertThat(arr.get(2).type()).isEqualTo(PhysicalType.DECIMAL4); - assertThat(arr.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); - assertThat(arr.get(3).type()).isEqualTo(PhysicalType.STRING); - assertThat(arr.get(3).asPrimitive().get()).isEqualTo(randomString); + SerializedArray actualArray = (SerializedArray) value; + assertThat(actualArray.numElements()).isEqualTo(4); + + assertThat(actualArray.get(0).type()).isEqualTo(PhysicalType.INT32); + assertThat(actualArray.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(actualArray.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(actualArray.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actualArray.get(2).type()).isEqualTo(PhysicalType.DECIMAL4); + assertThat(actualArray.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + assertThat(actualArray.get(3).type()).isEqualTo(PhysicalType.STRING); + assertThat(actualArray.get(3).asPrimitive().get()).isEqualTo(randomString); } @Test - public void testLargeObject() { + public void testLargeArray() { List> elements = Lists.newArrayList(); for (int i = 0; i < 10_000; i += 1) { elements.add(Variants.of(RandomUtil.generateString(10, random))); } - ShreddedArray shredded = createShreddedArray((List) elements); - VariantValue value = roundTripLargeBuffer(shredded); + ValueArray arr = createArray((List) elements); + VariantValue value = roundTripLargeBuffer(arr); assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); - SerializedArray arr = (SerializedArray) value; - assertThat(arr.numElements()).isEqualTo(10_000); + SerializedArray actualArray = (SerializedArray) value; + assertThat(actualArray.numElements()).isEqualTo(10_000); - for (VariantPrimitive entry : elements) { - assertThat(entry.type()).isEqualTo(PhysicalType.STRING); - assertThat(entry.asPrimitive().get()).isEqualTo(entry.get()); + for (int i = 0; i < 10_000; i++) { + VariantTestUtil.assertEqual(elements.get(i), actualArray.get(i)); } } - private static VariantValue roundTripMinimalBuffer(ShreddedArray arr) { + private static VariantValue roundTripMinimalBuffer(ValueArray arr) { ByteBuffer serialized = ByteBuffer.allocate(arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); arr.writeTo(serialized, 0); return Variants.value(EMPTY_METADATA, serialized); } - private static VariantValue roundTripLargeBuffer(ShreddedArray arr) { + private static VariantValue roundTripLargeBuffer(ValueArray arr) { ByteBuffer serialized = ByteBuffer.allocate(1000 + arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); arr.writeTo(serialized, 300); @@ -179,8 +155,8 @@ private static VariantValue roundTripLargeBuffer(ShreddedArray arr) { return Variants.value(EMPTY_METADATA, slice); } - private static ShreddedArray createShreddedArray(List elements) { - ShreddedArray arr = new ShreddedArray(); + private static ValueArray createArray(List elements) { + ValueArray arr = new ValueArray(); for (VariantValue element : elements) { arr.add(element); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index f42bd8a16954..80602d20f359 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -30,8 +30,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.variants.PhysicalType; -import org.apache.iceberg.variants.ShreddedArray; import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; @@ -100,14 +100,13 @@ public static VariantValueReader array( int valueDefinitionLevel, ParquetValueReader valueReader, int typedDefinitionLevel, - int typedRepetitionLevel, - VariantValueReader elementReader) { - return new ShreddedArrayReader( - valueDefinitionLevel, - (VariantValueReader) valueReader, - typedDefinitionLevel, - typedRepetitionLevel, - elementReader); + int repeatedDefinitionLevel, + int repeatedRepetitionLevel, + ParquetValueReader elementReader) { + VariantValueReader typedReader = + new ArrayReader( + repeatedDefinitionLevel, repeatedRepetitionLevel, (VariantValueReader) elementReader); + return shredded(valueDefinitionLevel, valueReader, typedDefinitionLevel, typedReader); } public static VariantValueReader asVariant(PhysicalType type, ParquetValueReader reader) { @@ -347,89 +346,61 @@ public void setPageSource(PageReadStore pageStore) { } } - private static class ShreddedArrayReader implements VariantValueReader { - private final int valueDL; - private final VariantValueReader valueReader; - private final int repeatedDL; - private final int repeatedRL; - private final VariantValueReader elementReader; - private final TripleIterator valueColumn; - private final TripleIterator elementColumn; + private static class ArrayReader implements VariantValueReader { + private final int definitionLevel; + private final int repetitionLevel; + private final VariantValueReader reader; + private final TripleIterator column; private final List> children; - private ShreddedArrayReader( - int valueDL, - VariantValueReader valueReader, - int typedDL, - int typedRL, - VariantValueReader elementReader) { - this.valueDL = valueDL; - this.valueReader = valueReader; - this.repeatedDL = typedDL + 1; - this.repeatedRL = typedRL + 1; - this.elementReader = elementReader; - this.elementColumn = this.elementReader.column(); - this.valueColumn = valueReader != null ? valueReader.column() : elementColumn; - this.children = - children(Iterables.concat(Arrays.asList(valueReader), Arrays.asList(elementReader))); + protected ArrayReader(int definitionLevel, int repetitionLevel, VariantValueReader reader) { + this.definitionLevel = definitionLevel; + this.repetitionLevel = repetitionLevel; + this.reader = reader; + this.column = reader.column(); + this.children = reader.columns(); } @Override - public VariantValue read(VariantMetadata metadata) { - // if the current definition level is less to the definition level of the repeated - // type, i.e. typed_value is null, then it's not an array - boolean isArray = elementColumn.currentDefinitionLevel() >= repeatedDL; - VariantValue value = ParquetVariantReaders.read(metadata, valueReader, valueDL); - - if (isArray) { - Preconditions.checkArgument( - value == MISSING, "Invalid variant, non-array value: %s", value); - - // if the current definition level is equal to the definition level of this repeated - // type, then it's an empty list and the repetition level will always be <= rl. - ShreddedArray arr = Variants.array(); - do { - if (elementColumn.currentDefinitionLevel() > repeatedDL) { - VariantValue elementValue = elementReader.read(metadata); - arr.add(elementValue); - } else { - // consume the empty list triple - for (TripleIterator child : elementReader.columns()) { - child.nextNull(); - } - break; - } - } while (elementColumn.currentRepetitionLevel() > repeatedRL); + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); + } - return arr; + @Override + public ValueArray read(VariantMetadata metadata) { + // if the current definition level is less than the definition level of this repeated type, + // then typed_value is null and the array is null + if (column.currentDefinitionLevel() < definitionLevel) { + return null; } - // for non-arrays, advance the element iterators - for (TripleIterator child : elementReader.columns()) { - child.nextNull(); - } + ValueArray arr = Variants.array(); + do { + if (column.currentDefinitionLevel() > definitionLevel) { + arr.add(reader.read(metadata)); + } else { + // consume the empty list triple + for (TripleIterator child : children) { + child.nextNull(); + } + // if the current definition level is equal to the definition level of this repeated type, + // then the result is an empty list and the repetition level will always be <= rl. + break; + } + } while (column.currentRepetitionLevel() > repetitionLevel); - return value; + return arr; } @Override public TripleIterator column() { - return valueColumn; + return column; } @Override public List> columns() { return children; } - - @Override - public void setPageSource(PageReadStore pageStore) { - if (valueReader != null) { - valueReader.setPageSource(pageStore); - } - - elementReader.setPageSource(pageStore); - } } private static class VariantReader implements ParquetValueReader { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java index 062c89f35acf..f1f065a74ed6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java @@ -43,6 +43,7 @@ import org.apache.parquet.schema.Type; public class VariantReaderBuilder extends ParquetVariantVisitor> { + private static final String LIST = "list"; private final MessageType schema; private final Iterable basePath; private final Deque fieldNames = Lists.newLinkedList(); @@ -66,8 +67,8 @@ private String[] currentPath() { return Streams.concat(Streams.stream(basePath), fieldNames.stream()).toArray(String[]::new); } - private String[] path(String name) { - return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(name)) + private String[] path(String... names) { + return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(names)) .toArray(String[]::new); } @@ -166,9 +167,10 @@ public VariantValueReader array( int valueDL = valueReader != null ? schema.getMaxDefinitionLevel(path(VALUE)) - 1 : Integer.MAX_VALUE; int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1; - int typedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE)) - 1; + int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST)) - 1; + int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST)) - 1; return ParquetVariantReaders.array( - valueDL, valueReader, typedDL, typedRL, (VariantValueReader) elementResult); + valueDL, valueReader, typedDL, repeatedDL, repeatedRL, elementResult); } private static class LogicalTypeToVariantReader diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index ef5917111ad5..954d30c2a684 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -47,8 +47,8 @@ import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.VariantType; import org.apache.iceberg.variants.PhysicalType; -import org.apache.iceberg.variants.ShreddedArray; import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; @@ -898,7 +898,7 @@ public void testMixedRecords() throws IOException { } @Test - public void testShreddedArray() throws IOException { + public void testSimpleArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType variantType = variant("var", 2, list(shreddedType)); MessageType parquetSchema = parquetSchema(variantType); @@ -912,7 +912,7 @@ public void testShreddedArray() throws IOException { Record actual = writeAndRead(parquetSchema, row); assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedArray expectedArray = Variants.array(); + ValueArray expectedArray = Variants.array(); expectedArray.add(Variants.of("comedy")); expectedArray.add(Variants.of("drama")); Variant actualVariant = (Variant) actual.getField("var"); @@ -921,7 +921,7 @@ public void testShreddedArray() throws IOException { } @Test - public void testShreddedNullArray() throws IOException { + public void testNullArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType variantType = variant("var", 2, list(shreddedType)); MessageType parquetSchema = parquetSchema(variantType); @@ -946,7 +946,7 @@ public void testShreddedNullArray() throws IOException { } @Test - public void testShreddedEmptyArray() throws IOException { + public void testEmptyArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType variantType = variant("var", 2, list(shreddedType)); MessageType parquetSchema = parquetSchema(variantType); @@ -967,7 +967,7 @@ public void testShreddedEmptyArray() throws IOException { } @Test - public void testShreddedArrayWithNull() throws IOException { + public void testArrayWithNull() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType variantType = variant("var", 2, list(shreddedType)); MessageType parquetSchema = parquetSchema(variantType); @@ -985,7 +985,7 @@ public void testShreddedArrayWithNull() throws IOException { Variant actualVariant = (Variant) actual.getField("var"); assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); assertThat(actualVariant.value().asArray().numElements()).isEqualTo(3); - ShreddedArray expectedArray = Variants.array(); + ValueArray expectedArray = Variants.array(); expectedArray.add(Variants.of("comedy")); expectedArray.add(Variants.ofNull()); expectedArray.add(Variants.of("drama")); @@ -994,7 +994,7 @@ public void testShreddedArrayWithNull() throws IOException { } @Test - public void testShreddedArrayWithNestedArray() throws IOException { + public void testNestedArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType innerListType = list(shreddedType); GroupType variantType = variant("var", 2, list(innerListType)); @@ -1014,11 +1014,11 @@ public void testShreddedArrayWithNestedArray() throws IOException { // Verify assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedArray expectedArray = Variants.array(); - ShreddedArray expectedInner1 = Variants.array(); + ValueArray expectedArray = Variants.array(); + ValueArray expectedInner1 = Variants.array(); expectedInner1.add(Variants.of("comedy")); expectedInner1.add(Variants.of("drama")); - ShreddedArray expectedInner2 = Variants.array(); + ValueArray expectedInner2 = Variants.array(); expectedArray.add(expectedInner1); expectedArray.add(expectedInner2); Variant actualVariant = (Variant) actual.getField("var"); @@ -1027,7 +1027,7 @@ public void testShreddedArrayWithNestedArray() throws IOException { } @Test - public void testShreddedArrayWithNestedObject() throws IOException { + public void testArrayWithNestedObject() throws IOException { GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType shreddedFields = objectFields(fieldA, fieldB); @@ -1072,7 +1072,7 @@ public void testShreddedArrayWithNestedObject() throws IOException { assertThat(actual1.getField("var")).isInstanceOf(Variant.class); ShreddedObject expected1 = Variants.object(TEST_METADATA); - ShreddedArray expectedArray1 = Variants.array(); + ValueArray expectedArray1 = Variants.array(); ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); expectedElement1.put("a", Variants.of(1)); expectedElement1.put("b", Variants.of("comedy")); @@ -1092,7 +1092,7 @@ public void testShreddedArrayWithNestedObject() throws IOException { assertThat(actual2.getField("var")).isInstanceOf(Variant.class); ShreddedObject expected2 = Variants.object(TEST_METADATA); - ShreddedArray expectedArray2 = Variants.array(); + ValueArray expectedArray2 = Variants.array(); ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); expectedElement3.put("a", Variants.of(3)); expectedElement3.put("b", Variants.of("action")); @@ -1109,7 +1109,7 @@ public void testShreddedArrayWithNestedObject() throws IOException { } @Test - public void testShreddedArrayWithNonArray() throws IOException { + public void testArrayWithNonArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType variantType = variant("var", 2, list(shreddedType)); MessageType parquetSchema = parquetSchema(variantType); @@ -1137,7 +1137,7 @@ public void testShreddedArrayWithNonArray() throws IOException { Record actual1 = actual.get(0); assertThat(actual1.getField("id")).isEqualTo(1); assertThat(actual1.getField("var")).isInstanceOf(Variant.class); - ShreddedArray expectedArray1 = Variants.array(); + ValueArray expectedArray1 = Variants.array(); expectedArray1.add(Variants.of("comedy")); expectedArray1.add(Variants.of("drama")); Variant actualVariant1 = (Variant) actual1.getField("var"); From daa2f842474c2194f9221dc503fc62cadff66ebe Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Wed, 19 Mar 2025 09:48:45 -0700 Subject: [PATCH 04/10] Rename to ListReader --- .../org/apache/iceberg/parquet/ParquetVariantReaders.java | 6 +++--- .../org/apache/iceberg/parquet/ParquetVariantVisitor.java | 1 + .../org/apache/iceberg/parquet/VariantReaderBuilder.java | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 80602d20f359..8f3944de5b24 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -104,7 +104,7 @@ public static VariantValueReader array( int repeatedRepetitionLevel, ParquetValueReader elementReader) { VariantValueReader typedReader = - new ArrayReader( + new ListReader( repeatedDefinitionLevel, repeatedRepetitionLevel, (VariantValueReader) elementReader); return shredded(valueDefinitionLevel, valueReader, typedDefinitionLevel, typedReader); } @@ -346,14 +346,14 @@ public void setPageSource(PageReadStore pageStore) { } } - private static class ArrayReader implements VariantValueReader { + private static class ListReader implements VariantValueReader { private final int definitionLevel; private final int repetitionLevel; private final VariantValueReader reader; private final TripleIterator column; private final List> children; - protected ArrayReader(int definitionLevel, int repetitionLevel, VariantValueReader reader) { + protected ListReader(int definitionLevel, int repetitionLevel, VariantValueReader reader) { this.definitionLevel = definitionLevel; this.repetitionLevel = repetitionLevel; this.reader = reader; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java index 71d2eb26627b..d0ca00b19313 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java @@ -31,6 +31,7 @@ public abstract class ParquetVariantVisitor { static final String METADATA = "metadata"; static final String VALUE = "value"; static final String TYPED_VALUE = "typed_value"; + static final String LIST = "list"; /** * Handles the root variant column group. diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java index f1f065a74ed6..a9e965adaa1b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java @@ -43,7 +43,6 @@ import org.apache.parquet.schema.Type; public class VariantReaderBuilder extends ParquetVariantVisitor> { - private static final String LIST = "list"; private final MessageType schema; private final Iterable basePath; private final Deque fieldNames = Lists.newLinkedList(); From 2da7541753bdb46217cc7991d5b97ddec7f76f45 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Wed, 19 Mar 2025 20:34:33 -0700 Subject: [PATCH 05/10] Remove unnecessary null handling in ListArray --- .../org/apache/iceberg/parquet/ParquetVariantReaders.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 8f3944de5b24..928e03769f02 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -368,12 +368,6 @@ public void setPageSource(PageReadStore pageStore) { @Override public ValueArray read(VariantMetadata metadata) { - // if the current definition level is less than the definition level of this repeated type, - // then typed_value is null and the array is null - if (column.currentDefinitionLevel() < definitionLevel) { - return null; - } - ValueArray arr = Variants.array(); do { if (column.currentDefinitionLevel() > definitionLevel) { From 5d8b1b71d1e137d8ff6dae752b5f59d5835a92bd Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Fri, 21 Mar 2025 16:39:47 -0700 Subject: [PATCH 06/10] Add more tests --- .../parquet/ParquetVariantReaders.java | 9 +- .../iceberg/parquet/VariantReaderBuilder.java | 10 +- .../iceberg/parquet/TestVariantReaders.java | 270 ++++++++++++------ 3 files changed, 199 insertions(+), 90 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 928e03769f02..dfbf50e38032 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -97,16 +97,11 @@ public static VariantValueReader objects( } public static VariantValueReader array( - int valueDefinitionLevel, - ParquetValueReader valueReader, - int typedDefinitionLevel, int repeatedDefinitionLevel, int repeatedRepetitionLevel, ParquetValueReader elementReader) { - VariantValueReader typedReader = - new ListReader( - repeatedDefinitionLevel, repeatedRepetitionLevel, (VariantValueReader) elementReader); - return shredded(valueDefinitionLevel, valueReader, typedDefinitionLevel, typedReader); + return new ListReader( + repeatedDefinitionLevel, repeatedRepetitionLevel, (VariantValueReader) elementReader); } public static VariantValueReader asVariant(PhysicalType type, ParquetValueReader reader) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java index a9e965adaa1b..e5c20eda4f64 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.parquet; +import static org.apache.iceberg.parquet.ParquetVariantReaders.shredded; + import java.util.Deque; import java.util.List; import java.util.Optional; @@ -137,7 +139,7 @@ public VariantValueReader value( typedReader != null ? schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1 : Integer.MAX_VALUE; - return ParquetVariantReaders.shredded(valueDL, valueReader, typedDL, typedReader); + return shredded(valueDL, valueReader, typedDL, typedReader); } @Override @@ -168,8 +170,10 @@ public VariantValueReader array( int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1; int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST)) - 1; int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST)) - 1; - return ParquetVariantReaders.array( - valueDL, valueReader, typedDL, repeatedDL, repeatedRL, elementResult); + VariantValueReader typedReader = + ParquetVariantReaders.array(repeatedDL, repeatedRL, elementResult); + + return ParquetVariantReaders.shredded(valueDL, valueReader, typedDL, typedReader); } private static class LogicalTypeToVariantReader diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index 954d30c2a684..b0476649084a 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -900,21 +900,24 @@ public void testMixedRecords() throws IOException { @Test public void testSimpleArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); - GroupType variantType = variant("var", 2, list(shreddedType)); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); MessageType parquetSchema = parquetSchema(variantType); - List arr = elements(shreddedType, List.of("comedy", "drama")); + List arr = elements(elementType, List.of("comedy", "drama")); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); - Record actual = writeAndRead(parquetSchema, row); - assertThat(actual.getField("id")).isEqualTo(1); - assertThat(actual.getField("var")).isInstanceOf(Variant.class); ValueArray expectedArray = Variants.array(); expectedArray.add(Variants.of("comedy")); expectedArray.add(Variants.of("drama")); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); @@ -923,7 +926,7 @@ public void testSimpleArray() throws IOException { @Test public void testNullArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); - GroupType variantType = variant("var", 2, list(shreddedType)); + GroupType variantType = variant("var", 2, list(element(shreddedType))); MessageType parquetSchema = parquetSchema(variantType); GenericRecord var = @@ -948,7 +951,7 @@ public void testNullArray() throws IOException { @Test public void testEmptyArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); - GroupType variantType = variant("var", 2, list(shreddedType)); + GroupType variantType = variant("var", 2, list(element(shreddedType))); MessageType parquetSchema = parquetSchema(variantType); List arr = List.of(); @@ -969,15 +972,21 @@ public void testEmptyArray() throws IOException { @Test public void testArrayWithNull() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); - GroupType variantType = variant("var", 2, list(shreddedType)); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); MessageType parquetSchema = parquetSchema(variantType); - List arr = elements(shreddedType, Lists.newArrayList("comedy", null, "drama")); + List arr = elements(elementType, Lists.newArrayList("comedy", null, "drama")); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.ofNull()); + expectedArray.add(Variants.of("drama")); + Record actual = writeAndRead(parquetSchema, row); assertThat(actual.getField("id")).isEqualTo(1); @@ -985,10 +994,6 @@ public void testArrayWithNull() throws IOException { Variant actualVariant = (Variant) actual.getField("var"); assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); assertThat(actualVariant.value().asArray().numElements()).isEqualTo(3); - ValueArray expectedArray = Variants.array(); - expectedArray.add(Variants.of("comedy")); - expectedArray.add(Variants.ofNull()); - expectedArray.add(Variants.of("drama")); VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); } @@ -996,24 +1001,20 @@ public void testArrayWithNull() throws IOException { @Test public void testNestedArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); - GroupType innerListType = list(shreddedType); - GroupType variantType = variant("var", 2, list(innerListType)); + GroupType elementType = element(shreddedType); + GroupType outerElementType = element(list(elementType)); + GroupType variantType = variant("var", 2, list(outerElementType)); MessageType parquetSchema = parquetSchema(variantType); - List inner1 = elements(shreddedType, List.of("comedy", "drama")); - List inner2 = elements(shreddedType, List.of()); - List outer1 = elements(innerListType, List.of(inner1, inner2)); + List inner1 = elements(elementType, List.of("comedy", "drama")); + List inner2 = elements(elementType, List.of()); + List outer1 = elements(outerElementType, List.of(inner1, inner2)); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", outer1)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); - Record actual = writeAndRead(parquetSchema, row); - - // Verify - assertThat(actual.getField("id")).isEqualTo(1); - assertThat(actual.getField("var")).isInstanceOf(Variant.class); ValueArray expectedArray = Variants.array(); ValueArray expectedInner1 = Variants.array(); expectedInner1.add(Variants.of("comedy")); @@ -1021,6 +1022,12 @@ public void testNestedArray() throws IOException { ValueArray expectedInner2 = Variants.array(); expectedArray.add(expectedInner1); expectedArray.add(expectedInner2); + + Record actual = writeAndRead(parquetSchema, row); + + // Verify + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); @@ -1031,7 +1038,8 @@ public void testArrayWithNestedObject() throws IOException { GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType shreddedFields = objectFields(fieldA, fieldB); - GroupType listType = list(shreddedFields); + GroupType elementType = element(shreddedFields); + GroupType listType = list(elementType); GroupType fieldC = field("c", listType); GroupType objectFields = objectFields(fieldC); GroupType variantType = variant("var", 2, objectFields); @@ -1044,13 +1052,25 @@ public void testArrayWithNestedObject() throws IOException { GenericRecord a2 = record(fieldA, Map.of("typed_value", 2)); GenericRecord b2 = record(fieldB, Map.of("typed_value", "drama")); GenericRecord shredded2 = record(shreddedFields, Map.of("a", a2, "b", b2)); - List arr1 = elements(shreddedFields, List.of(shredded1, shredded2)); + List arr1 = elements(elementType, List.of(shredded1, shredded2)); GenericRecord element1 = record(fieldC, Map.of("typed_value", arr1)); GenericRecord c1 = record(objectFields, Map.of("c", element1)); GenericRecord var1 = record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", c1)); GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); + ShreddedObject expected1 = Variants.object(TEST_METADATA); + ValueArray expectedArray1 = Variants.array(); + ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); + expectedElement1.put("a", Variants.of(1)); + expectedElement1.put("b", Variants.of("comedy")); + expectedArray1.add(expectedElement1); + ShreddedObject expectedElement2 = Variants.object(TEST_METADATA); + expectedElement2.put("a", Variants.of(2)); + expectedElement2.put("b", Variants.of("drama")); + expectedArray1.add(expectedElement2); + expected1.put("c", expectedArray1); + // Row 2 GenericRecord a3 = record(fieldA, Map.of("typed_value", 3)); GenericRecord b3 = record(fieldB, Map.of("typed_value", "action")); @@ -1058,31 +1078,31 @@ public void testArrayWithNestedObject() throws IOException { GenericRecord a4 = record(fieldA, Map.of("typed_value", 4)); GenericRecord b4 = record(fieldB, Map.of("typed_value", "horror")); GenericRecord shredded4 = record(shreddedFields, Map.of("a", a4, "b", b4)); - List arr2 = elements(shreddedFields, List.of(shredded3, shredded4)); + List arr2 = elements(elementType, List.of(shredded3, shredded4)); GenericRecord element2 = record(fieldC, Map.of("typed_value", arr2)); GenericRecord c2 = record(objectFields, Map.of("c", element2)); GenericRecord var2 = record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", c2)); GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); + ShreddedObject expected2 = Variants.object(TEST_METADATA); + ValueArray expectedArray2 = Variants.array(); + ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); + expectedElement3.put("a", Variants.of(3)); + expectedElement3.put("b", Variants.of("action")); + expectedArray2.add(expectedElement3); + ShreddedObject expectedElement4 = Variants.object(TEST_METADATA); + expectedElement4.put("a", Variants.of(4)); + expectedElement4.put("b", Variants.of("horror")); + expectedArray2.add(expectedElement4); + expected2.put("c", expectedArray2); + // verify List actual = writeAndRead(parquetSchema, List.of(row1, row2)); Record actual1 = actual.get(0); assertThat(actual1.getField("id")).isEqualTo(1); assertThat(actual1.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected1 = Variants.object(TEST_METADATA); - ValueArray expectedArray1 = Variants.array(); - ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); - expectedElement1.put("a", Variants.of(1)); - expectedElement1.put("b", Variants.of("comedy")); - expectedArray1.add(expectedElement1); - ShreddedObject expectedElement2 = Variants.object(TEST_METADATA); - expectedElement2.put("a", Variants.of(2)); - expectedElement2.put("b", Variants.of("drama")); - expectedArray1.add(expectedElement2); - expected1.put("c", expectedArray1); - Variant actualVariant1 = (Variant) actual1.getField("var"); VariantTestUtil.assertEqual(TEST_METADATA, actualVariant1.metadata()); VariantTestUtil.assertEqual(expected1, actualVariant1.value()); @@ -1091,18 +1111,6 @@ public void testArrayWithNestedObject() throws IOException { assertThat(actual2.getField("id")).isEqualTo(2); assertThat(actual2.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected2 = Variants.object(TEST_METADATA); - ValueArray expectedArray2 = Variants.array(); - ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); - expectedElement3.put("a", Variants.of(3)); - expectedElement3.put("b", Variants.of("action")); - expectedArray2.add(expectedElement3); - ShreddedObject expectedElement4 = Variants.object(TEST_METADATA); - expectedElement4.put("a", Variants.of(4)); - expectedElement4.put("b", Variants.of("horror")); - expectedArray2.add(expectedElement4); - expected2.put("c", expectedArray2); - Variant actualVariant2 = (Variant) actual2.getField("var"); VariantTestUtil.assertEqual(TEST_METADATA, actualVariant2.metadata()); VariantTestUtil.assertEqual(expected2, actualVariant2.value()); @@ -1111,15 +1119,20 @@ public void testArrayWithNestedObject() throws IOException { @Test public void testArrayWithNonArray() throws IOException { Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); - GroupType variantType = variant("var", 2, list(shreddedType)); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); MessageType parquetSchema = parquetSchema(variantType); - List arr1 = elements(shreddedType, List.of("comedy", "drama")); + List arr1 = elements(elementType, List.of("comedy", "drama")); GenericRecord var1 = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr1)); GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); + ValueArray expectedArray1 = Variants.array(); + expectedArray1.add(Variants.of("comedy")); + expectedArray1.add(Variants.of("drama")); + GenericRecord var2 = record( variantType, @@ -1127,38 +1140,144 @@ public void testArrayWithNonArray() throws IOException { "metadata", VariantTestUtil.emptyMetadata(), "value", serialize(Variants.of(34)))); GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); + VariantValue expectedValue2 = Variants.of(PhysicalType.INT32, 34); + GenericRecord var3 = record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "value", TEST_OBJECT_BUFFER)); GenericRecord row3 = record(parquetSchema, Map.of("id", 3, "var", var3)); - List actual = writeAndRead(parquetSchema, List.of(row1, row2, row3)); + ShreddedObject expectedObject3 = Variants.object(TEST_METADATA); + expectedObject3.put("a", Variants.ofNull()); + expectedObject3.put("d", Variants.of("iceberg")); + + // Test array is read properly after a non-array + List arr4 = elements(elementType, List.of("action", "horror")); + GenericRecord var4 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr4)); + GenericRecord row4 = record(parquetSchema, Map.of("id", 4, "var", var4)); + + ValueArray expectedArray4 = Variants.array(); + expectedArray4.add(Variants.of("action")); + expectedArray4.add(Variants.of("horror")); + + List actual = writeAndRead(parquetSchema, List.of(row1, row2, row3, row4)); // Verify Record actual1 = actual.get(0); assertThat(actual1.getField("id")).isEqualTo(1); assertThat(actual1.getField("var")).isInstanceOf(Variant.class); - ValueArray expectedArray1 = Variants.array(); - expectedArray1.add(Variants.of("comedy")); - expectedArray1.add(Variants.of("drama")); Variant actualVariant1 = (Variant) actual1.getField("var"); VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant1.metadata()); + VariantTestUtil.assertEqual(expectedArray1, actualVariant1.value()); Record actual2 = actual.get(1); assertThat(actual2.getField("id")).isEqualTo(2); assertThat(actual2.getField("var")).isInstanceOf(Variant.class); Variant actualVariant2 = (Variant) actual2.getField("var"); VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant2.metadata()); - VariantTestUtil.assertEqual(Variants.of(PhysicalType.INT32, 34), actualVariant2.value()); + VariantTestUtil.assertEqual(expectedValue2, actualVariant2.value()); Record actual3 = actual.get(2); assertThat(actual3.getField("id")).isEqualTo(3); assertThat(actual3.getField("var")).isInstanceOf(Variant.class); Variant actualVariant3 = (Variant) actual3.getField("var"); VariantTestUtil.assertEqual(TEST_METADATA, actualVariant3.metadata()); - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.ofNull()); - expected.put("d", Variants.of("iceberg")); - VariantTestUtil.assertEqual(expected, actualVariant3.value()); + VariantTestUtil.assertEqual(expectedObject3, actualVariant3.value()); + + Record actual4 = actual.get(3); + assertThat(actual4.getField("id")).isEqualTo(4); + assertThat(actual4.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant4 = (Variant) actual4.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant4.metadata()); + VariantTestUtil.assertEqual(expectedArray4, actualVariant4.value()); + } + + @Test + public void testArrayMissingValueColumn() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = + Types.buildGroup(Type.Repetition.OPTIONAL) + .id(2) + .required(PrimitiveTypeName.BINARY) + .named("metadata") + .addField(list(elementType)) + .named("var"); + + MessageType parquetSchema = parquetSchema(variantType); + + List arr = elements(elementType, List.of("comedy", "drama")); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testArrayMissingElementValueColumn() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = + Types.buildGroup(Type.Repetition.REQUIRED).addField(shreddedType).named("element"); + + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + List arr = elements(elementType, List.of("comedy", "drama")); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testArrayWithElementNullValueAndNullTypedValue() throws IOException { + // Test the invalid case that both value and typed_value of an element are null + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord element = record(elementType, Map.of()); + GenericRecord variant = + record( + variantType, + Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", List.of(element))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantValue actualValue = actualVariant.value(); + assertThat(actualValue.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(actualValue.asArray().numElements()).isEqualTo(1); + assertThat(actualValue.asArray().get(0)).isNull(); } private static ByteBuffer serialize(VariantValue value) { @@ -1182,25 +1301,16 @@ private static GenericRecord record(GroupType type, Map fields) return record; } - private static List elements(Type shreddedType, List elements) { - GroupType elementType = - Types.buildGroup(Type.Repetition.REQUIRED) - .addField( - Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL).named("value")) - .addField(shreddedType) - .named("element"); - org.apache.avro.Schema elementSchema = avroSchema(elementType); - + private static List elements(GroupType elementType, List elements) { + // TODO List elementRecords = Lists.newArrayList(); if (elements != null) { for (T element : elements) { - GenericRecord elementRecord = new GenericData.Record(elementSchema); if (element != null) { - elementRecord.put("typed_value", element); + elementRecords.add(record(elementType, Map.of("typed_value", element))); } else { - elementRecord.put("value", serialize(Variants.ofNull())); + elementRecords.add(record(elementType, Map.of("value", serialize(Variants.ofNull())))); } - elementRecords.add(elementRecord); } } @@ -1405,12 +1515,12 @@ private static GroupType field(String name, Type shreddedType) { .named(name); } - private static GroupType list(Type shreddedType) { - return Types.optionalList() - .requiredGroupElement() - .addField(Types.optional(PrimitiveTypeName.BINARY).named("value")) - .addField(shreddedType) - .named("typed_value"); + private static GroupType element(Type shreddedType) { + return field("element", shreddedType); + } + + private static GroupType list(GroupType elementType) { + return Types.optionalList().element(elementType).named("typed_value"); } private static org.apache.avro.Schema avroSchema(GroupType schema) { From fd2c2955442f18bdfa134122a419ebe599912090 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Mon, 24 Mar 2025 11:10:58 -0700 Subject: [PATCH 07/10] Add shredding logic in TestVariantReaders --- .../iceberg/variants/VariantTestUtil.java | 6 + .../org/apache/iceberg/variants/Variants.java | 9 +- .../iceberg/parquet/VariantReaderBuilder.java | 4 +- .../iceberg/parquet/TestVariantReaders.java | 176 +++++++++++++++--- 4 files changed, 160 insertions(+), 35 deletions(-) diff --git a/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java b/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java index f06f481e6eee..c5a3d32d7615 100644 --- a/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java +++ b/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java @@ -107,6 +107,12 @@ static SerializedPrimitive createString(String string) { return SerializedPrimitive.from(buffer, buffer.get(0)); } + public static ByteBuffer valueBuffer(VariantValue value) { + ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + value.writeTo(buffer, 0); + return buffer; + } + public static ByteBuffer variantBuffer(Map data) { ByteBuffer meta = VariantTestUtil.createMetadata(data.keySet(), true /* sort names */); ByteBuffer value = VariantTestUtil.createObject(meta, data); diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java index 5591145ca603..a7be1c27902b 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variants.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -121,8 +121,13 @@ public static boolean isNull(ByteBuffer valueBuffer) { return VariantUtil.readByte(valueBuffer, 0) == 0; } - public static ValueArray array() { - return new ValueArray(); + public static ValueArray array(VariantValue... elements) { + ValueArray arr = new ValueArray(); + for (VariantValue element : elements) { + arr.add(element); + } + + return arr; } public static VariantPrimitive of(PhysicalType type, T value) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java index e5c20eda4f64..d679fd5a53fd 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.parquet; -import static org.apache.iceberg.parquet.ParquetVariantReaders.shredded; - import java.util.Deque; import java.util.List; import java.util.Optional; @@ -139,7 +137,7 @@ public VariantValueReader value( typedReader != null ? schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1 : Integer.MAX_VALUE; - return shredded(valueDL, valueReader, typedDL, typedReader); + return ParquetVariantReaders.shredded(valueDL, valueReader, typedDL, typedReader); } @Override diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index b0476649084a..0a07bbc1e31b 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -27,6 +27,8 @@ import java.nio.ByteOrder; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -42,6 +44,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.NestedField; @@ -50,6 +54,7 @@ import org.apache.iceberg.variants.ShreddedObject; import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantArray; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; import org.apache.iceberg.variants.VariantPrimitive; @@ -904,7 +909,8 @@ public void testSimpleArray() throws IOException { GroupType variantType = variant("var", 2, list(elementType)); MessageType parquetSchema = parquetSchema(variantType); - List arr = elements(elementType, List.of("comedy", "drama")); + List arr = + elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -976,7 +982,11 @@ public void testArrayWithNull() throws IOException { GroupType variantType = variant("var", 2, list(elementType)); MessageType parquetSchema = parquetSchema(variantType); - List arr = elements(elementType, Lists.newArrayList("comedy", null, "drama")); + List arr = + elements( + elementType, + Lists.newArrayList( + List.of(Variants.of("comedy"), Variants.ofNull(), Variants.of("drama")))); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -1006,9 +1016,10 @@ public void testNestedArray() throws IOException { GroupType variantType = variant("var", 2, list(outerElementType)); MessageType parquetSchema = parquetSchema(variantType); - List inner1 = elements(elementType, List.of("comedy", "drama")); - List inner2 = elements(elementType, List.of()); - List outer1 = elements(outerElementType, List.of(inner1, inner2)); + List outer1 = + elements( + outerElementType, + List.of(Variants.array(Variants.of("comedy"), Variants.of("drama")), Variants.array())); GenericRecord var = record( variantType, @@ -1046,12 +1057,15 @@ public void testArrayWithNestedObject() throws IOException { MessageType parquetSchema = parquetSchema(variantType); // Row 1 - GenericRecord a1 = record(fieldA, Map.of("typed_value", 1)); - GenericRecord b1 = record(fieldB, Map.of("typed_value", "comedy")); - GenericRecord shredded1 = record(shreddedFields, Map.of("a", a1, "b", b1)); - GenericRecord a2 = record(fieldA, Map.of("typed_value", 2)); - GenericRecord b2 = record(fieldB, Map.of("typed_value", "drama")); - GenericRecord shredded2 = record(shreddedFields, Map.of("a", a2, "b", b2)); + ByteBuffer shreddedBuffer1 = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, Map.of("a", Variants.of(1), "b", Variants.of("comedy"))); + VariantValue shredded1 = Variants.value(TEST_METADATA, shreddedBuffer1); + ByteBuffer shreddedBuffer2 = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, Map.of("a", Variants.of(2), "b", Variants.of("drama"))); + VariantValue shredded2 = Variants.value(TEST_METADATA, shreddedBuffer2); + List arr1 = elements(elementType, List.of(shredded1, shredded2)); GenericRecord element1 = record(fieldC, Map.of("typed_value", arr1)); GenericRecord c1 = record(objectFields, Map.of("c", element1)); @@ -1072,12 +1086,14 @@ public void testArrayWithNestedObject() throws IOException { expected1.put("c", expectedArray1); // Row 2 - GenericRecord a3 = record(fieldA, Map.of("typed_value", 3)); - GenericRecord b3 = record(fieldB, Map.of("typed_value", "action")); - GenericRecord shredded3 = record(shreddedFields, Map.of("a", a3, "b", b3)); - GenericRecord a4 = record(fieldA, Map.of("typed_value", 4)); - GenericRecord b4 = record(fieldB, Map.of("typed_value", "horror")); - GenericRecord shredded4 = record(shreddedFields, Map.of("a", a4, "b", b4)); + ByteBuffer shreddedBuffer3 = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, Map.of("a", Variants.of(3), "b", Variants.of("action"))); + VariantValue shredded3 = Variants.value(TEST_METADATA, shreddedBuffer3); + ByteBuffer shreddedBuffer4 = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, Map.of("a", Variants.of(4), "b", Variants.of("horror"))); + VariantValue shredded4 = Variants.value(TEST_METADATA, shreddedBuffer4); List arr2 = elements(elementType, List.of(shredded3, shredded4)); GenericRecord element2 = record(fieldC, Map.of("typed_value", arr2)); GenericRecord c2 = record(objectFields, Map.of("c", element2)); @@ -1123,7 +1139,8 @@ public void testArrayWithNonArray() throws IOException { GroupType variantType = variant("var", 2, list(elementType)); MessageType parquetSchema = parquetSchema(variantType); - List arr1 = elements(elementType, List.of("comedy", "drama")); + List arr1 = + elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); GenericRecord var1 = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr1)); @@ -1151,7 +1168,8 @@ public void testArrayWithNonArray() throws IOException { expectedObject3.put("d", Variants.of("iceberg")); // Test array is read properly after a non-array - List arr4 = elements(elementType, List.of("action", "horror")); + List arr4 = + elements(elementType, List.of(Variants.of("action"), Variants.of("horror"))); GenericRecord var4 = record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr4)); GenericRecord row4 = record(parquetSchema, Map.of("id", 4, "var", var4)); @@ -1206,7 +1224,8 @@ public void testArrayMissingValueColumn() throws IOException { MessageType parquetSchema = parquetSchema(variantType); - List arr = elements(elementType, List.of("comedy", "drama")); + List arr = + elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -1234,7 +1253,8 @@ public void testArrayMissingElementValueColumn() throws IOException { GroupType variantType = variant("var", 2, list(elementType)); MessageType parquetSchema = parquetSchema(variantType); - List arr = elements(elementType, List.of("comedy", "drama")); + List arr = + elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -1301,20 +1321,100 @@ private static GenericRecord record(GroupType type, Map fields) return record; } - private static List elements(GroupType elementType, List elements) { - // TODO + private static List elements(GroupType elementType, List elements) { List elementRecords = Lists.newArrayList(); - if (elements != null) { - for (T element : elements) { - if (element != null) { - elementRecords.add(record(elementType, Map.of("typed_value", element))); - } else { - elementRecords.add(record(elementType, Map.of("value", serialize(Variants.ofNull())))); + for (VariantValue element : elements) { + elementRecords.add(shred(elementType, element)); + } + + return elementRecords; + } + + private static GenericRecord shred(GroupType fieldType, VariantValue value) { + checkField(fieldType); + + switch (value.type()) { + case OBJECT: + return shredObject(fieldType, value.asObject()); + case ARRAY: + return shredArray(fieldType, value.asArray()); + default: + return shredPrimitive(fieldType, value.asPrimitive()); + } + } + + private static GenericRecord shredPrimitive(GroupType fieldType, VariantPrimitive primitive) { + Type shreddedType = fieldType.getType("typed_value"); + if (shreddedType.isPrimitive() + && primitive.type() != PhysicalType.NULL + && shreddedType(primitive).equals(shreddedType)) { + return record(fieldType, Map.of("typed_value", primitive.get())); + } + + return record(fieldType, Map.of("value", VariantTestUtil.valueBuffer(primitive))); + } + + private static GenericRecord shredArray(GroupType fieldType, VariantArray array) { + Type shreddedType = fieldType.getType("typed_value"); + if (shreddedType.getLogicalTypeAnnotation() + instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { + checkListType(shreddedType.asGroupType()); + + List list = Lists.newArrayList(); + for (int i = 0; i < array.numElements(); i++) { + list.add( + shred( + shreddedType + .asGroupType() + .getFields() + .get(0) + .asGroupType() + .getFields() + .get(0) + .asGroupType(), + array.get(i))); + } + + return record(fieldType, Map.of("typed_value", list)); + } + + return record(fieldType, Map.of("value", VariantTestUtil.valueBuffer(array))); + } + + private static GenericRecord shredObject(GroupType fieldType, VariantObject object) { + Type shreddedType = fieldType.getType("typed_value"); + if (!shreddedType.isPrimitive()) { + Set unshreddedFieldNames = Sets.newHashSet(); + Iterables.addAll(unshreddedFieldNames, object.fieldNames()); + + // Shredded + Map shredded = Maps.newHashMap(); + for (Type subType : shreddedType.asGroupType().getFields()) { + String subName = subType.getName(); + VariantValue subValue = object.get(subName); + if (subValue != null) { + unshreddedFieldNames.remove(subName); + shredded.put( + subName, shred(shreddedType.asGroupType().getType(subName).asGroupType(), subValue)); } } + + // Unshredded + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(unshreddedFieldNames, false); + Map unshreddedFields = + unshreddedFieldNames.stream().collect(Collectors.toMap(name -> name, object::get)); + ByteBuffer unshreddedBuffer = VariantTestUtil.createObject(metadataBuffer, unshreddedFields); + + return record( + fieldType, + Map.of( + "typed_value", + record(shreddedType.asGroupType(), shredded), + "value", + unshreddedBuffer)); } - return elementRecords; + return record(fieldType, Map.of("value", VariantTestUtil.valueBuffer(object))); } /** @@ -1523,6 +1623,22 @@ private static GroupType list(GroupType elementType) { return Types.optionalList().element(elementType).named("typed_value"); } + private static void checkListType(GroupType listType) { + // Check the list is a 3-level structure + Preconditions.checkArgument( + listType.getFieldCount() == 1 + && listType.getFields().get(0).isRepetition(Type.Repetition.REPEATED), + "Invalid list type: does not contain single repeated field: %s", + listType); + + GroupType repeated = listType.getFields().get(0).asGroupType(); + Preconditions.checkArgument( + repeated.getFieldCount() == 1 + && repeated.getFields().get(0).isRepetition(Type.Repetition.REQUIRED), + "Invalid list type: does not contain single required subfield: %s", + listType); + } + private static org.apache.avro.Schema avroSchema(GroupType schema) { if (schema instanceof MessageType) { return new AvroSchemaConverter(CONF).convert((MessageType) schema); From 6519f0a269f5bcdbef6f04ef7f7e177dd5c61580 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Fri, 18 Apr 2025 12:13:49 -0700 Subject: [PATCH 08/10] Rename to ArrayReader --- .../main/java/org/apache/iceberg/variants/Variants.java | 9 ++------- .../apache/iceberg/parquet/ParquetVariantReaders.java | 6 +++--- .../org/apache/iceberg/parquet/VariantReaderBuilder.java | 4 ++-- .../org/apache/iceberg/parquet/TestVariantReaders.java | 9 +++++---- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java index a7be1c27902b..5591145ca603 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variants.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -121,13 +121,8 @@ public static boolean isNull(ByteBuffer valueBuffer) { return VariantUtil.readByte(valueBuffer, 0) == 0; } - public static ValueArray array(VariantValue... elements) { - ValueArray arr = new ValueArray(); - for (VariantValue element : elements) { - arr.add(element); - } - - return arr; + public static ValueArray array() { + return new ValueArray(); } public static VariantPrimitive of(PhysicalType type, T value) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index dfbf50e38032..67b4de63d0b0 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -100,7 +100,7 @@ public static VariantValueReader array( int repeatedDefinitionLevel, int repeatedRepetitionLevel, ParquetValueReader elementReader) { - return new ListReader( + return new ArrayReader( repeatedDefinitionLevel, repeatedRepetitionLevel, (VariantValueReader) elementReader); } @@ -341,14 +341,14 @@ public void setPageSource(PageReadStore pageStore) { } } - private static class ListReader implements VariantValueReader { + private static class ArrayReader implements VariantValueReader { private final int definitionLevel; private final int repetitionLevel; private final VariantValueReader reader; private final TripleIterator column; private final List> children; - protected ListReader(int definitionLevel, int repetitionLevel, VariantValueReader reader) { + protected ArrayReader(int definitionLevel, int repetitionLevel, VariantValueReader reader) { this.definitionLevel = definitionLevel; this.repetitionLevel = repetitionLevel; this.reader = reader; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java index d679fd5a53fd..29ca90034623 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java @@ -162,14 +162,14 @@ public VariantValueReader object( @Override public VariantValueReader array( - GroupType array, ParquetValueReader valueReader, ParquetValueReader elementResult) { + GroupType array, ParquetValueReader valueReader, ParquetValueReader elementReader) { int valueDL = valueReader != null ? schema.getMaxDefinitionLevel(path(VALUE)) - 1 : Integer.MAX_VALUE; int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1; int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST)) - 1; int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST)) - 1; VariantValueReader typedReader = - ParquetVariantReaders.array(repeatedDL, repeatedRL, elementResult); + ParquetVariantReaders.array(repeatedDL, repeatedRL, elementReader); return ParquetVariantReaders.shredded(valueDL, valueReader, typedDL, typedReader); } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index 0a07bbc1e31b..6246be7e6002 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -1016,10 +1016,11 @@ public void testNestedArray() throws IOException { GroupType variantType = variant("var", 2, list(outerElementType)); MessageType parquetSchema = parquetSchema(variantType); - List outer1 = - elements( - outerElementType, - List.of(Variants.array(Variants.of("comedy"), Variants.of("drama")), Variants.array())); + ValueArray inner1 = Variants.array(); + inner1.add(Variants.of("comedy")); + inner1.add(Variants.of("drama")); + + List outer1 = elements(outerElementType, List.of(inner1, Variants.array())); GenericRecord var = record( variantType, From deff5f2ea616cf4d1b64da38f16a23a0b8ca2f57 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Thu, 24 Apr 2025 21:35:05 -0700 Subject: [PATCH 09/10] Update test to construct records directly --- .../iceberg/variants/VariantTestUtil.java | 6 - .../iceberg/variants/TestShreddedObject.java | 2 +- .../iceberg/variants/TestValueArray.java | 6 +- .../iceberg/parquet/TestVariantReaders.java | 264 ++++++++---------- 4 files changed, 113 insertions(+), 165 deletions(-) diff --git a/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java b/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java index c5a3d32d7615..f06f481e6eee 100644 --- a/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java +++ b/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java @@ -107,12 +107,6 @@ static SerializedPrimitive createString(String string) { return SerializedPrimitive.from(buffer, buffer.get(0)); } - public static ByteBuffer valueBuffer(VariantValue value) { - ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); - value.writeTo(buffer, 0); - return buffer; - } - public static ByteBuffer variantBuffer(Map data) { ByteBuffer meta = VariantTestUtil.createMetadata(data.keySet(), true /* sort names */); ByteBuffer value = VariantTestUtil.createObject(meta, data); diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java index 9d9df2bb87f7..66d5c9911a79 100644 --- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java +++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java @@ -222,7 +222,7 @@ public void testPartiallyShreddedObjectSerializationLargeBuffer() { public void testMultiByteOffsets(int len) { // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, and 4 bytes String randomString = RandomUtil.generateString(len, random); - VariantPrimitive bigString = Variants.of(randomString); + VariantPrimitive bigString = Variants.of(randomString); Map data = Maps.newHashMap(); data.putAll(FIELDS); diff --git a/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java index 6a2fec626b9f..f500f6106573 100644 --- a/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java +++ b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java @@ -94,7 +94,7 @@ public void testSerializationLargeBuffer() { public void testMultiByteOffsets(int len) { // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, and 4 bytes String randomString = RandomUtil.generateString(len, random); - VariantPrimitive bigString = Variants.of(randomString); + VariantPrimitive bigString = Variants.of(randomString); List data = Lists.newArrayList(); data.addAll(ELEMENTS); @@ -119,12 +119,12 @@ public void testMultiByteOffsets(int len) { @Test public void testLargeArray() { - List> elements = Lists.newArrayList(); + List elements = Lists.newArrayList(); for (int i = 0; i < 10_000; i += 1) { elements.add(Variants.of(RandomUtil.generateString(10, random))); } - ValueArray arr = createArray((List) elements); + ValueArray arr = createArray(elements); VariantValue value = roundTripLargeBuffer(arr); assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index 6246be7e6002..81946ed34dea 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -27,8 +27,6 @@ import java.nio.ByteOrder; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -44,8 +42,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.NestedField; @@ -54,7 +50,6 @@ import org.apache.iceberg.variants.ShreddedObject; import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; -import org.apache.iceberg.variants.VariantArray; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; import org.apache.iceberg.variants.VariantPrimitive; @@ -910,7 +905,10 @@ public void testSimpleArray() throws IOException { MessageType parquetSchema = parquetSchema(variantType); List arr = - elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); + GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -983,10 +981,11 @@ public void testArrayWithNull() throws IOException { MessageType parquetSchema = parquetSchema(variantType); List arr = - elements( - elementType, - Lists.newArrayList( - List.of(Variants.of("comedy"), Variants.ofNull(), Variants.of("drama")))); + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("value", serialize(Variants.ofNull()))), + record(elementType, Map.of("typed_value", "drama"))); + GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -1016,11 +1015,14 @@ public void testNestedArray() throws IOException { GroupType variantType = variant("var", 2, list(outerElementType)); MessageType parquetSchema = parquetSchema(variantType); - ValueArray inner1 = Variants.array(); - inner1.add(Variants.of("comedy")); - inner1.add(Variants.of("drama")); - - List outer1 = elements(outerElementType, List.of(inner1, Variants.array())); + List inner1 = + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); + List outer1 = + List.of( + record(outerElementType, Map.of("typed_value", inner1)), + record(outerElementType, Map.of("typed_value", List.of()))); GenericRecord var = record( variantType, @@ -1052,67 +1054,86 @@ public void testArrayWithNestedObject() throws IOException { GroupType shreddedFields = objectFields(fieldA, fieldB); GroupType elementType = element(shreddedFields); GroupType listType = list(elementType); - GroupType fieldC = field("c", listType); - GroupType objectFields = objectFields(fieldC); - GroupType variantType = variant("var", 2, objectFields); + GroupType variantType = variant("var", 2, listType); MessageType parquetSchema = parquetSchema(variantType); - // Row 1 - ByteBuffer shreddedBuffer1 = - VariantTestUtil.createObject( - TEST_METADATA_BUFFER, Map.of("a", Variants.of(1), "b", Variants.of("comedy"))); - VariantValue shredded1 = Variants.value(TEST_METADATA, shreddedBuffer1); - ByteBuffer shreddedBuffer2 = - VariantTestUtil.createObject( - TEST_METADATA_BUFFER, Map.of("a", Variants.of(2), "b", Variants.of("drama"))); - VariantValue shredded2 = Variants.value(TEST_METADATA, shreddedBuffer2); - - List arr1 = elements(elementType, List.of(shredded1, shredded2)); - GenericRecord element1 = record(fieldC, Map.of("typed_value", arr1)); - GenericRecord c1 = record(objectFields, Map.of("c", element1)); + // Row 1 with nested fully shredded object + GenericRecord shredded1 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 1)), + "b", + record(fieldB, Map.of("typed_value", "comedy")))); + GenericRecord shredded2 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 2)), + "b", + record(fieldB, Map.of("typed_value", "drama")))); + List arr1 = + List.of( + record(elementType, Map.of("typed_value", shredded1)), + record(elementType, Map.of("typed_value", shredded2))); GenericRecord var1 = - record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", c1)); + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr1)); GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); - ShreddedObject expected1 = Variants.object(TEST_METADATA); - ValueArray expectedArray1 = Variants.array(); + ValueArray expected1 = Variants.array(); ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); expectedElement1.put("a", Variants.of(1)); expectedElement1.put("b", Variants.of("comedy")); - expectedArray1.add(expectedElement1); + expected1.add(expectedElement1); ShreddedObject expectedElement2 = Variants.object(TEST_METADATA); expectedElement2.put("a", Variants.of(2)); expectedElement2.put("b", Variants.of("drama")); - expectedArray1.add(expectedElement2); - expected1.put("c", expectedArray1); + expected1.add(expectedElement2); - // Row 2 - ByteBuffer shreddedBuffer3 = - VariantTestUtil.createObject( - TEST_METADATA_BUFFER, Map.of("a", Variants.of(3), "b", Variants.of("action"))); - VariantValue shredded3 = Variants.value(TEST_METADATA, shreddedBuffer3); - ByteBuffer shreddedBuffer4 = - VariantTestUtil.createObject( - TEST_METADATA_BUFFER, Map.of("a", Variants.of(4), "b", Variants.of("horror"))); - VariantValue shredded4 = Variants.value(TEST_METADATA, shreddedBuffer4); - List arr2 = elements(elementType, List.of(shredded3, shredded4)); - GenericRecord element2 = record(fieldC, Map.of("typed_value", arr2)); - GenericRecord c2 = record(objectFields, Map.of("c", element2)); + // Row 2 with nested partially shredded object + GenericRecord shredded3 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 3)), + "b", + record(fieldB, Map.of("typed_value", "action")))); + ShreddedObject baseObject3 = Variants.object(TEST_METADATA); + baseObject3.put("c", Variants.of("str")); + + GenericRecord shredded4 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 4)), + "b", + record(fieldB, Map.of("typed_value", "horror")))); + ShreddedObject baseObject4 = Variants.object(TEST_METADATA); + baseObject4.put("d", Variants.ofIsoDate("2024-01-30")); + + List arr2 = + List.of( + record(elementType, Map.of("value", serialize(baseObject3), "typed_value", shredded3)), + record(elementType, Map.of("value", serialize(baseObject4), "typed_value", shredded4))); GenericRecord var2 = - record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", c2)); + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr2)); GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); - ShreddedObject expected2 = Variants.object(TEST_METADATA); - ValueArray expectedArray2 = Variants.array(); + ValueArray expected2 = Variants.array(); ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); expectedElement3.put("a", Variants.of(3)); expectedElement3.put("b", Variants.of("action")); - expectedArray2.add(expectedElement3); + expectedElement3.put("c", Variants.of("str")); + expected2.add(expectedElement3); ShreddedObject expectedElement4 = Variants.object(TEST_METADATA); expectedElement4.put("a", Variants.of(4)); expectedElement4.put("b", Variants.of("horror")); - expectedArray2.add(expectedElement4); - expected2.put("c", expectedArray2); + expectedElement4.put("d", Variants.ofIsoDate("2024-01-30")); + expected2.add(expectedElement4); // verify List actual = writeAndRead(parquetSchema, List.of(row1, row2)); @@ -1141,7 +1162,9 @@ public void testArrayWithNonArray() throws IOException { MessageType parquetSchema = parquetSchema(variantType); List arr1 = - elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); GenericRecord var1 = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr1)); @@ -1170,7 +1193,9 @@ public void testArrayWithNonArray() throws IOException { // Test array is read properly after a non-array List arr4 = - elements(elementType, List.of(Variants.of("action"), Variants.of("horror"))); + List.of( + record(elementType, Map.of("typed_value", "action")), + record(elementType, Map.of("typed_value", "horror"))); GenericRecord var4 = record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr4)); GenericRecord row4 = record(parquetSchema, Map.of("id", 4, "var", var4)); @@ -1226,7 +1251,9 @@ public void testArrayMissingValueColumn() throws IOException { MessageType parquetSchema = parquetSchema(variantType); List arr = - elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -1255,7 +1282,9 @@ public void testArrayMissingElementValueColumn() throws IOException { MessageType parquetSchema = parquetSchema(variantType); List arr = - elements(elementType, List.of(Variants.of("comedy"), Variants.of("drama"))); + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); GenericRecord var = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); @@ -1301,6 +1330,27 @@ public void testArrayWithElementNullValueAndNullTypedValue() throws IOException assertThat(actualValue.asArray().get(0)).isNull(); } + @Test + public void testArrayWithElementValueTypedValueConflict() { + // Test the invalid case that both value and typed_value of an element are not null + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord element = + record(elementType, Map.of("value", serialize(Variants.of(3)), "typed_value", "comedy")); + GenericRecord variant = + record( + variantType, + Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", List.of(element))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + assertThatThrownBy(() -> writeAndRead(parquetSchema, record)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid variant, conflicting value and typed_value"); + } + private static ByteBuffer serialize(VariantValue value) { ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); value.writeTo(buffer, 0); @@ -1322,102 +1372,6 @@ private static GenericRecord record(GroupType type, Map fields) return record; } - private static List elements(GroupType elementType, List elements) { - List elementRecords = Lists.newArrayList(); - for (VariantValue element : elements) { - elementRecords.add(shred(elementType, element)); - } - - return elementRecords; - } - - private static GenericRecord shred(GroupType fieldType, VariantValue value) { - checkField(fieldType); - - switch (value.type()) { - case OBJECT: - return shredObject(fieldType, value.asObject()); - case ARRAY: - return shredArray(fieldType, value.asArray()); - default: - return shredPrimitive(fieldType, value.asPrimitive()); - } - } - - private static GenericRecord shredPrimitive(GroupType fieldType, VariantPrimitive primitive) { - Type shreddedType = fieldType.getType("typed_value"); - if (shreddedType.isPrimitive() - && primitive.type() != PhysicalType.NULL - && shreddedType(primitive).equals(shreddedType)) { - return record(fieldType, Map.of("typed_value", primitive.get())); - } - - return record(fieldType, Map.of("value", VariantTestUtil.valueBuffer(primitive))); - } - - private static GenericRecord shredArray(GroupType fieldType, VariantArray array) { - Type shreddedType = fieldType.getType("typed_value"); - if (shreddedType.getLogicalTypeAnnotation() - instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { - checkListType(shreddedType.asGroupType()); - - List list = Lists.newArrayList(); - for (int i = 0; i < array.numElements(); i++) { - list.add( - shred( - shreddedType - .asGroupType() - .getFields() - .get(0) - .asGroupType() - .getFields() - .get(0) - .asGroupType(), - array.get(i))); - } - - return record(fieldType, Map.of("typed_value", list)); - } - - return record(fieldType, Map.of("value", VariantTestUtil.valueBuffer(array))); - } - - private static GenericRecord shredObject(GroupType fieldType, VariantObject object) { - Type shreddedType = fieldType.getType("typed_value"); - if (!shreddedType.isPrimitive()) { - Set unshreddedFieldNames = Sets.newHashSet(); - Iterables.addAll(unshreddedFieldNames, object.fieldNames()); - - // Shredded - Map shredded = Maps.newHashMap(); - for (Type subType : shreddedType.asGroupType().getFields()) { - String subName = subType.getName(); - VariantValue subValue = object.get(subName); - if (subValue != null) { - unshreddedFieldNames.remove(subName); - shredded.put( - subName, shred(shreddedType.asGroupType().getType(subName).asGroupType(), subValue)); - } - } - - // Unshredded - ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(unshreddedFieldNames, false); - Map unshreddedFields = - unshreddedFieldNames.stream().collect(Collectors.toMap(name -> name, object::get)); - ByteBuffer unshreddedBuffer = VariantTestUtil.createObject(metadataBuffer, unshreddedFields); - - return record( - fieldType, - Map.of( - "typed_value", - record(shreddedType.asGroupType(), shredded), - "value", - unshreddedBuffer)); - } - - return record(fieldType, Map.of("value", VariantTestUtil.valueBuffer(object))); - } - /** * This is a custom Parquet writer builder that injects a specific Parquet schema and then uses * the Avro object model. This ensures that the Parquet file's schema is exactly what was passed. From df4e4d7f2267dc700e7a6c632eb4cd8844c7c5e5 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Fri, 25 Apr 2025 14:12:45 -0700 Subject: [PATCH 10/10] Return Variant null when both value and typed_value are null --- .../java/org/apache/iceberg/parquet/ParquetVariantReaders.java | 3 ++- .../java/org/apache/iceberg/parquet/TestVariantReaders.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 67b4de63d0b0..40b0aeecc3b5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -366,7 +366,8 @@ public ValueArray read(VariantMetadata metadata) { ValueArray arr = Variants.array(); do { if (column.currentDefinitionLevel() > definitionLevel) { - arr.add(reader.read(metadata)); + VariantValue value = reader.read(metadata); + arr.add(value != null ? value : Variants.ofNull()); } else { // consume the empty list triple for (TripleIterator child : children) { diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index 81946ed34dea..23c6e9b3282c 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -1327,7 +1327,7 @@ public void testArrayWithElementNullValueAndNullTypedValue() throws IOException VariantValue actualValue = actualVariant.value(); assertThat(actualValue.type()).isEqualTo(PhysicalType.ARRAY); assertThat(actualValue.asArray().numElements()).isEqualTo(1); - assertThat(actualValue.asArray().get(0)).isNull(); + VariantTestUtil.assertEqual(Variants.ofNull(), actualValue.asArray().get(0)); } @Test