diff --git a/api/src/main/java/org/apache/iceberg/variants/SerializedArray.java b/api/src/main/java/org/apache/iceberg/variants/SerializedArray.java index 619d2fe24ef5..b390d96f31ed 100644 --- a/api/src/main/java/org/apache/iceberg/variants/SerializedArray.java +++ b/api/src/main/java/org/apache/iceberg/variants/SerializedArray.java @@ -85,4 +85,9 @@ public VariantValue get(int index) { public ByteBuffer buffer() { return value; } + + @Override + public String toString() { + return VariantArray.asString(this); + } } diff --git a/api/src/main/java/org/apache/iceberg/variants/VariantArray.java b/api/src/main/java/org/apache/iceberg/variants/VariantArray.java index 07972400d58a..e37781e9d7a3 100644 --- a/api/src/main/java/org/apache/iceberg/variants/VariantArray.java +++ b/api/src/main/java/org/apache/iceberg/variants/VariantArray.java @@ -35,4 +35,23 @@ default PhysicalType type() { default VariantArray asArray() { return this; } + + static String asString(VariantArray arr) { + StringBuilder builder = new StringBuilder(); + + builder.append("VariantArray(["); + boolean first = true; + for (int i = 0; i < arr.numElements(); i++) { + if (first) { + first = false; + } else { + builder.append(", "); + } + + builder.append(arr.get(i)); + } + builder.append("])"); + + return builder.toString(); + } } diff --git a/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java b/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java index f06f481e6eee..4c77fefe5046 100644 --- a/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java +++ b/api/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java @@ -231,14 +231,14 @@ public static ByteBuffer createObject(VariantMetadata metadata, Map 0xFF; int dataSize = 0; - for (Serialized value : values) { + for (VariantValue value : values) { // TODO: produce size for every variant without serializing - dataSize += value.buffer().remaining(); + dataSize += value.sizeInBytes(); } // offset size is the size needed to store the length of the data section @@ -260,13 +260,11 @@ static ByteBuffer createArray(Serialized... values) { // write values and offsets int nextOffset = 0; // the first offset is always 0 int index = 0; - for (Serialized value : values) { + for (VariantValue value : values) { // write the offset and value VariantUtil.writeLittleEndianUnsigned( buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize); - // in a real implementation, the buffer should be passed to serialize - ByteBuffer valueBuffer = value.buffer(); - int valueSize = writeBufferAbsolute(buffer, dataOffset + nextOffset, valueBuffer); + int valueSize = value.writeTo(buffer, dataOffset + nextOffset); // update next offset and index nextOffset += valueSize; index += 1; diff --git a/core/src/main/java/org/apache/iceberg/variants/ValueArray.java b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java index 3da79bcef106..2c6b29a51f63 100644 --- a/core/src/main/java/org/apache/iceberg/variants/ValueArray.java +++ b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java @@ -127,4 +127,9 @@ private int writeTo(ByteBuffer buffer, int offset) { return (dataOffset - offset) + dataSize; } } + + @Override + public String toString() { + return VariantArray.asString(this); + } } diff --git a/core/src/test/java/org/apache/iceberg/RandomVariants.java b/core/src/test/java/org/apache/iceberg/RandomVariants.java index 7723e3bb9851..52fdf1d966ec 100644 --- a/core/src/test/java/org/apache/iceberg/RandomVariants.java +++ b/core/src/test/java/org/apache/iceberg/RandomVariants.java @@ -27,6 +27,7 @@ import org.apache.iceberg.util.UUIDUtil; import org.apache.iceberg.variants.PhysicalType; import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantTestUtil; @@ -120,7 +121,13 @@ private static VariantValue randomVariant( byte[] uuidBytes = (byte[]) RandomUtil.generatePrimitive(Types.UUIDType.get(), random); return Variants.of(type, UUIDUtil.convert(uuidBytes)); case ARRAY: - // for now, generate an object instead of an array + ValueArray arr = Variants.array(); + int numElements = random.nextInt(10); + for (int i = 0; i < numElements; i += 1) { + arr.add(randomVariant(random, metadata, randomType(random))); + } + + return arr; case OBJECT: ShreddedObject object = Variants.object(metadata); if (metadata.dictionarySize() > 0) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java index ac8f8ef2cfff..3f02b5183b80 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantUtil.java @@ -25,7 +25,11 @@ import java.util.Comparator; import java.util.Deque; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.iceberg.expressions.PathUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -395,7 +399,30 @@ public Type object(VariantObject object, List names, List typedVal @Override public Type array(VariantArray array, List elementResults) { - return null; + if (elementResults.isEmpty()) { + return null; + } + + // Choose most common type as shredding type and build 3-level list + Type defaultTYpe = elementResults.get(0); + Type shredType = + elementResults.stream() + .filter(Objects::nonNull) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())) + .entrySet() + .stream() + .max(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .orElse(defaultTYpe); + + return list(shredType); + } + + private static GroupType list(Type shreddedType) { + GroupType elementType = field("element", shreddedType); + checkField(elementType); + + return Types.optionalList().element(elementType).named("typed_value"); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java index 6f6d9d3d4e81..9e94b1bbd6cd 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java @@ -32,6 +32,7 @@ import org.apache.iceberg.variants.PhysicalType; import org.apache.iceberg.variants.ShreddedObject; import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantArray; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; import org.apache.iceberg.variants.VariantValue; @@ -98,6 +99,17 @@ static ParquetValueWriter objects( builder.build()); } + @SuppressWarnings("unchecked") + public static ParquetValueWriter array( + int repeatedDefinitionLevel, + int repeatedRepetitionLevel, + ParquetValueWriter elementWriter) { + return new ArrayWriter( + repeatedDefinitionLevel, + repeatedRepetitionLevel, + (ParquetValueWriter) elementWriter); + } + private static class VariantWriter implements ParquetValueWriter { private final ParquetValueWriter metadataWriter; private final ParquetValueWriter valueWriter; @@ -360,6 +372,55 @@ public void setColumnStore(ColumnWriteStore columnStore) { } } + private static class ArrayWriter implements TypedWriter { + private final int definitionLevel; + private final int repetitionLevel; + private final ParquetValueWriter writer; + private final List> children; + + protected ArrayWriter( + int definitionLevel, int repetitionLevel, ParquetValueWriter writer) { + this.definitionLevel = definitionLevel; + this.repetitionLevel = repetitionLevel; + this.writer = writer; + this.children = writer.columns(); + } + + @Override + public Set types() { + return Set.of(PhysicalType.ARRAY); + } + + @Override + public void write(int parentRepetition, VariantValue value) { + VariantArray arr = value.asArray(); + if (arr.numElements() == 0) { + writeNull(writer, parentRepetition, definitionLevel); + } else { + for (int i = 0; i < arr.numElements(); i++) { + VariantValue element = arr.get(i); + + int rl = repetitionLevel; + if (i == 0) { + rl = parentRepetition; + } + + writer.write(rl, element); + } + } + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + writer.setColumnStore(columnStore); + } + } + private static void writeNull( ParquetValueWriter writer, int repetitionLevel, int definitionLevel) { for (TripleWriter column : writer.columns()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java index 1b2150b7c929..a447a102690a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java @@ -169,7 +169,15 @@ public ParquetValueWriter object( @Override public ParquetValueWriter array( GroupType array, ParquetValueWriter valueWriter, ParquetValueWriter elementWriter) { - throw new UnsupportedOperationException("Array is not yet supported"); + int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); + int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)); + int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST)); + int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST)); + + ParquetValueWriter typedWriter = + ParquetVariantWriters.array(repeatedDL, repeatedRL, elementWriter); + + return ParquetVariantWriters.shredded(valueDL, valueWriter, typedDL, typedWriter); } private static class LogicalTypeToVariantWriter diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java index cd6ff41e51f8..36ca2cf477ed 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java @@ -41,10 +41,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantArray; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.variants.Variants; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.FieldSource; @@ -73,6 +76,12 @@ public class TestVariantWriters { "c", Variants.of("string"))); private static final ByteBuffer EMPTY_OBJECT_BUFFER = VariantTestUtil.createObject(TEST_METADATA_BUFFER, ImmutableMap.of()); + private static final ByteBuffer ARRAY_IN_OBJECT_BUFFER = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, + ImmutableMap.of( + "a", Variants.of(123456789), + "c", array(Variants.of("string"), Variants.of("iceberg")))); private static final VariantMetadata EMPTY_METADATA = Variants.metadata(VariantTestUtil.emptyMetadata()); @@ -83,6 +92,45 @@ public class TestVariantWriters { (VariantObject) Variants.value(TEST_METADATA, SIMILAR_OBJECT_BUFFER); private static final VariantObject EMPTY_OBJECT = (VariantObject) Variants.value(TEST_METADATA, EMPTY_OBJECT_BUFFER); + private static final VariantObject ARRAY_IN_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, ARRAY_IN_OBJECT_BUFFER); + + private static final ByteBuffer EMPTY_ARRAY_BUFFER = VariantTestUtil.createArray(); + private static final ByteBuffer TEST_ARRAY_BUFFER = + VariantTestUtil.createArray(Variants.of("iceberg"), Variants.of("string")); + private static final ByteBuffer MIXED_TYPE_ARRAY_BUFFER = + VariantTestUtil.createArray(Variants.of("iceberg"), Variants.of("string"), Variants.of(34)); + private static final ByteBuffer NESTED_ARRAY_BUFFER = + VariantTestUtil.createArray( + array(Variants.of("string"), Variants.of("iceberg")), + array(Variants.of("string"), Variants.of("iceberg"))); + private static final ByteBuffer MIXED_NESTED_ARRAY_BUFFER = + VariantTestUtil.createArray( + array(Variants.of("string"), Variants.of("iceberg"), Variants.of(34)), + array(Variants.of(34), Variants.ofNull()), + array(), + array(Variants.of("string"), Variants.of("iceberg")), + Variants.of(34)); + private static final ByteBuffer OBJECT_IN_ARRAY_BUFFER = + VariantTestUtil.createArray(SIMILAR_OBJECT, SIMILAR_OBJECT); + private static final ByteBuffer MIXED_OBJECT_IN_ARRAY_BUFFER = + VariantTestUtil.createArray( + SIMILAR_OBJECT, SIMILAR_OBJECT, Variants.of("iceberg"), Variants.of(34)); + + private static final VariantArray EMPTY_ARRAY = + (VariantArray) Variants.value(EMPTY_METADATA, EMPTY_ARRAY_BUFFER); + private static final VariantArray TEST_ARRAY = + (VariantArray) Variants.value(EMPTY_METADATA, TEST_ARRAY_BUFFER); + private static final VariantArray MIXED_TYPE_ARRAY = + (VariantArray) Variants.value(EMPTY_METADATA, MIXED_TYPE_ARRAY_BUFFER); + private static final VariantArray NESTED_ARRAY = + (VariantArray) Variants.value(EMPTY_METADATA, NESTED_ARRAY_BUFFER); + private static final VariantArray MIXED_NESTED_ARRAY = + (VariantArray) Variants.value(EMPTY_METADATA, MIXED_NESTED_ARRAY_BUFFER); + private static final VariantArray OBJECT_IN_ARRAY = + (VariantArray) Variants.value(TEST_METADATA, OBJECT_IN_ARRAY_BUFFER); + private static final VariantArray MIXED_OBJECT_IN_ARRAY = + (VariantArray) Variants.value(TEST_METADATA, MIXED_OBJECT_IN_ARRAY_BUFFER); private static final Variant[] VARIANTS = new Variant[] { @@ -104,6 +152,14 @@ public class TestVariantWriters { Variant.of(EMPTY_METADATA, EMPTY_OBJECT), Variant.of(TEST_METADATA, TEST_OBJECT), Variant.of(TEST_METADATA, SIMILAR_OBJECT), + Variant.of(TEST_METADATA, ARRAY_IN_OBJECT), + Variant.of(EMPTY_METADATA, EMPTY_ARRAY), + Variant.of(EMPTY_METADATA, TEST_ARRAY), + Variant.of(EMPTY_METADATA, MIXED_TYPE_ARRAY), + Variant.of(EMPTY_METADATA, NESTED_ARRAY), + Variant.of(EMPTY_METADATA, MIXED_NESTED_ARRAY), + Variant.of(TEST_METADATA, OBJECT_IN_ARRAY), + Variant.of(TEST_METADATA, MIXED_OBJECT_IN_ARRAY), Variant.of(EMPTY_METADATA, Variants.ofIsoDate("2024-11-07")), Variant.of(EMPTY_METADATA, Variants.ofIsoDate("1957-11-07")), Variant.of(EMPTY_METADATA, Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00")), @@ -200,4 +256,13 @@ private static List writeAndRead( return Lists.newArrayList(reader); } } + + private static ValueArray array(VariantValue... values) { + ValueArray arr = Variants.array(); + for (VariantValue value : values) { + arr.add(value); + } + + return arr; + } }