From e93c910e0b53c1d025b39e7354345fc2ed59d282 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 18 Feb 2025 15:28:55 -0800 Subject: [PATCH 1/7] Parquet: Implement Variant writers. --- .../data/parquet/BaseParquetWriter.java | 44 +- .../parquet/ParquetVariantWriters.java | 379 ++++++++++++++++++ .../iceberg/parquet/VariantWriterBuilder.java | 279 +++++++++++++ 3 files changed, 692 insertions(+), 10 deletions(-) create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 13ae65d10ab9..92aab005579c 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetVariantVisitor; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -34,9 +38,14 @@ abstract class BaseParquetWriter { - @SuppressWarnings("unchecked") protected ParquetValueWriter createWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return createWriter(null, type); + } + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(Types.StructType struct, MessageType type) { + return (ParquetValueWriter) + TypeWithSchemaVisitor.visit(struct, type, new WriteBuilder(type)); } protected abstract ParquetValueWriters.StructWriter createStructWriter( @@ -62,7 +71,7 @@ protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean i } } - private class WriteBuilder extends ParquetTypeVisitor> { + private class WriteBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private WriteBuilder(MessageType type) { @@ -71,14 +80,14 @@ private WriteBuilder(MessageType type) { @Override public ParquetValueWriter message( - MessageType message, List> fieldWriters) { + Types.StructType struct, MessageType message, List> fieldWriters) { - return struct(message.asGroupType(), fieldWriters); + return struct(struct, message.asGroupType(), fieldWriters); } @Override public ParquetValueWriter struct( - GroupType struct, List> fieldWriters) { + Types.StructType iceberg, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); for (int i = 0; i < fields.size(); i += 1) { @@ -91,7 +100,8 @@ public ParquetValueWriter struct( } @Override - public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list( + Types.ListType iceberg, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -107,7 +117,10 @@ public ParquetValueWriter list(GroupType array, ParquetValueWriter element @Override public ParquetValueWriter map( - GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { + Types.MapType iceberg, + GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -127,7 +140,8 @@ public ParquetValueWriter map( } @Override - public ParquetValueWriter primitive(PrimitiveType primitive) { + public ParquetValueWriter primitive( + org.apache.iceberg.types.Type.PrimitiveType iceberg, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { @@ -157,6 +171,16 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + + @Override + public ParquetValueWriter variant(Types.VariantType iVariant, ParquetValueWriter result) { + return result; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantWriterBuilder(type, Arrays.asList(currentPath())); + } } private class LogicalTypeWriterVisitor diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java new file mode 100644 index 000000000000..366215a750c0 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.column.ColumnWriteStore; + +class ParquetVariantWriters { + private ParquetVariantWriters() {} + + @SuppressWarnings("unchecked") + static ParquetValueWriter variant( + ParquetValueWriter metadataWriter, ParquetValueWriter valueWriter) { + return new VariantWriter( + (ParquetValueWriter) metadataWriter, + (ParquetValueWriter) valueWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter metadata(ParquetValueWriter bytesWriter) { + return new VariantMetadataWriter((ParquetValueWriter) bytesWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter value(ParquetValueWriter bytesWriter) { + return new VariantValueWriter((ParquetValueWriter) bytesWriter); + } + + static ParquetValueWriter primitive( + ParquetValueWriter writer, PhysicalType... types) { + return new PrimitiveWriter<>(writer, Sets.immutableEnumSet(Arrays.asList(types))); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter shredded( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + ParquetValueWriter typedWriter) { + return new ShreddedVariantWriter( + valueDefinitionLevel, + (ParquetValueWriter) valueWriter, + typedDefinitionLevel, + (TypedWriter) typedWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter objects( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + List fieldNames, + List> fieldWriters) { + ImmutableMap.Builder> builder = ImmutableMap.builder(); + for (int i = 0; i < fieldNames.size(); i += 1) { + builder.put(fieldNames.get(i), (ParquetValueWriter) fieldWriters.get(i)); + } + + return new ShreddedObjectWriter( + valueDefinitionLevel, + (ParquetValueWriter) valueWriter, + typedDefinitionLevel, + builder.build()); + } + + private static class VariantWriter implements ParquetValueWriter { + private final ParquetValueWriter metadataWriter; + private final ParquetValueWriter valueWriter; + private final List> children; + + private VariantWriter( + ParquetValueWriter metadataWriter, + ParquetValueWriter valueWriter) { + this.metadataWriter = metadataWriter; + this.valueWriter = valueWriter; + this.children = children(metadataWriter, valueWriter); + } + + @Override + public void write(int repetitionLevel, Variant variant) { + metadataWriter.write(repetitionLevel, variant.metadata()); + valueWriter.write(repetitionLevel, variant.value()); + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + metadataWriter.setColumnStore(columnStore); + valueWriter.setColumnStore(columnStore); + } + } + + private abstract static class VariantBinaryWriter implements ParquetValueWriter { + private final ParquetValueWriter bytesWriter; + private ByteBuffer reusedBuffer = ByteBuffer.allocate(2048); + + private VariantBinaryWriter(ParquetValueWriter bytesWriter) { + this.bytesWriter = bytesWriter; + } + + protected abstract int sizeInBytes(T value); + + protected abstract int writeTo(ByteBuffer buffer, int offset, T value); + + @Override + public void write(int repetitionLevel, T value) { + bytesWriter.write(repetitionLevel, serialize(value)); + } + + @Override + public List> columns() { + return bytesWriter.columns(); + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + bytesWriter.setColumnStore(columnStore); + } + + private void ensureCapacity(int requiredSize) { + if (reusedBuffer.capacity() < requiredSize) { + int newCapacity = capacityFor(requiredSize); + this.reusedBuffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN); + } + } + + private ByteBuffer serialize(T value) { + ensureCapacity(sizeInBytes(value)); + int size = writeTo(reusedBuffer, 0, value); + reusedBuffer.position(0); + reusedBuffer.limit(size); + return reusedBuffer; + } + } + + private static class VariantMetadataWriter extends VariantBinaryWriter { + private VariantMetadataWriter(ParquetValueWriter bytesWriter) { + super(bytesWriter); + } + + @Override + protected int sizeInBytes(VariantMetadata metadata) { + return metadata.sizeInBytes(); + } + + @Override + protected int writeTo(ByteBuffer buffer, int offset, VariantMetadata metadata) { + return metadata.writeTo(buffer, offset); + } + } + + private static class VariantValueWriter extends VariantBinaryWriter { + private VariantValueWriter(ParquetValueWriter bytesWriter) { + super(bytesWriter); + } + + @Override + protected int sizeInBytes(VariantValue value) { + return value.sizeInBytes(); + } + + @Override + protected int writeTo(ByteBuffer buffer, int offset, VariantValue value) { + return value.writeTo(buffer, offset); + } + } + + private interface TypedWriter extends ParquetValueWriter { + Set types(); + } + + private static class PrimitiveWriter implements TypedWriter { + private final Set types; + private final ParquetValueWriter writer; + + private PrimitiveWriter(ParquetValueWriter writer, Set types) { + this.types = types; + this.writer = writer; + } + + @Override + public Set types() { + return types; + } + + @Override + @SuppressWarnings("unchecked") + public void write(int repetitionLevel, VariantValue value) { + writer.write(repetitionLevel, (T) value.asPrimitive().get()); + } + + @Override + public List> columns() { + return writer.columns(); + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + writer.setColumnStore(columnStore); + } + } + + private static class ShreddedVariantWriter implements ParquetValueWriter { + private final int valueDefinitionLevel; + private final ParquetValueWriter valueWriter; + private final int typedDefinitionLevel; + private final TypedWriter typedWriter; + private final List> children; + + private ShreddedVariantWriter( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + TypedWriter typedWriter) { + this.valueDefinitionLevel = valueDefinitionLevel; + this.valueWriter = valueWriter; + this.typedDefinitionLevel = typedDefinitionLevel; + this.typedWriter = typedWriter; + this.children = children(valueWriter, typedWriter); + } + + @Override + public void write(int repetitionLevel, VariantValue value) { + if (typedWriter.types().contains(value.type())) { + typedWriter.write(repetitionLevel, value); + writeNull(valueWriter, repetitionLevel, valueDefinitionLevel); + } else { + valueWriter.write(repetitionLevel, value); + writeNull(typedWriter, repetitionLevel, typedDefinitionLevel); + } + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + valueWriter.setColumnStore(columnStore); + typedWriter.setColumnStore(columnStore); + } + } + + private static class ShreddedObjectWriter implements ParquetValueWriter { + private final int valueDefinitionLevel; + private final ParquetValueWriter valueWriter; + private final int typedDefinitionLevel; + private final Map> typedWriters; + private final List> children; + + private ShreddedObjectWriter( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + Map> typedWriters) { + this.valueDefinitionLevel = valueDefinitionLevel; + this.valueWriter = valueWriter; + this.typedDefinitionLevel = typedDefinitionLevel; + this.typedWriters = typedWriters; + this.children = + children( + Iterables.concat( + ImmutableList.of(valueWriter), ImmutableList.copyOf(typedWriters.values()))); + } + + @Override + public void write(int repetitionLevel, VariantValue value) { + if (value.type() != PhysicalType.OBJECT) { + valueWriter.write(repetitionLevel, value); + + // write null for all fields + for (ParquetValueWriter writer : typedWriters.values()) { + writeNull(writer, repetitionLevel, typedDefinitionLevel); + } + + } else { + VariantObject object = value.asObject(); + ShreddedObject shredded = Variants.object(null, object); + for (Map.Entry> entry : typedWriters.entrySet()) { + String fieldName = entry.getKey(); + ParquetValueWriter writer = entry.getValue(); + + VariantValue fieldValue = object.get(fieldName); + if (fieldValue != null) { + // shredded: suppress the field in the object and write it to the value pair + shredded.remove(fieldName); + writer.write(repetitionLevel, fieldValue); + } else { + // missing: write null to both value and typed_value + writeNull(writer, repetitionLevel, typedDefinitionLevel); + } + } + + if (shredded.numFields() > 0) { + // partially shredded: write the unshredded fields + valueWriter.write(repetitionLevel, shredded); + } else { + // completely shredded: omit the empty value + writeNull(valueWriter, repetitionLevel, valueDefinitionLevel); + } + } + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + valueWriter.setColumnStore(columnStore); + for (ParquetValueWriter fieldWriter : typedWriters.values()) { + fieldWriter.setColumnStore(columnStore); + } + } + } + + private static void writeNull( + ParquetValueWriter writer, int repetitionLevel, int definitionLevel) { + for (TripleWriter column : writer.columns()) { + column.writeNull(repetitionLevel, definitionLevel - 1); + } + } + + private static List> children(ParquetValueWriter... writers) { + return children(Arrays.asList(writers)); + } + + private static List> children(Iterable> writers) { + return ImmutableList.copyOf( + Iterables.concat(Iterables.transform(writers, ParquetValueWriter::columns))); + } + + private static final double LN2 = Math.log(2); + + private static int capacityFor(int valueSize) { + // find the power of 2 size that fits the value + int nextPow2 = (int) Math.ceil(Math.log(valueSize) / LN2); + // return a capacity that is 2 the next power of 2 size up to the max + return Math.min(1 << (nextPow2 + 1), Integer.MAX_VALUE - 1); + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java new file mode 100644 index 000000000000..14668f6c84bc --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.Deque; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.VariantValue; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public class VariantWriterBuilder extends ParquetVariantVisitor> { + private final MessageType schema; + private final Iterable basePath; + private final Deque fieldNames = Lists.newLinkedList(); + + public VariantWriterBuilder(MessageType schema, Iterable basePath) { + this.schema = schema; + this.basePath = basePath; + } + + @Override + public void beforeField(Type type) { + fieldNames.addLast(type.getName()); + } + + @Override + public void afterField(Type type) { + fieldNames.removeLast(); + } + + private String[] currentPath() { + return Streams.concat(Streams.stream(basePath), fieldNames.stream()).toArray(String[]::new); + } + + private String[] path(String name) { + return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(name)) + .toArray(String[]::new); + } + + @Override + public ParquetValueWriter variant( + GroupType variant, ParquetValueWriter metadataWriter, ParquetValueWriter valueWriter) { + return ParquetVariantWriters.variant(metadataWriter, valueWriter); + } + + @Override + public ParquetValueWriter metadata(PrimitiveType metadata) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + return ParquetVariantWriters.metadata(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public ParquetValueWriter serialized(PrimitiveType value) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + return ParquetVariantWriters.value(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public ParquetValueWriter primitive(PrimitiveType primitive) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); + if (annotation != null) { + Optional> writer = + annotation.accept(new LogicalTypeToVariantWriter(desc)); + if (writer.isPresent()) { + return writer.get(); + } + + } else { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + return ParquetVariantWriters.primitive( + ParquetValueWriters.byteBuffers(desc), PhysicalType.BINARY); + case BOOLEAN: + return ParquetVariantWriters.primitive( + ParquetValueWriters.booleans(desc), + PhysicalType.BOOLEAN_TRUE, + PhysicalType.BOOLEAN_FALSE); + case INT32: + return ParquetVariantWriters.primitive( + ParquetValueWriters.ints(desc), PhysicalType.INT32); + case INT64: + return ParquetVariantWriters.primitive( + ParquetValueWriters.longs(desc), PhysicalType.INT64); + case FLOAT: + return ParquetVariantWriters.primitive( + ParquetValueWriters.floats(desc), PhysicalType.FLOAT); + case DOUBLE: + return ParquetVariantWriters.primitive( + ParquetValueWriters.doubles(desc), PhysicalType.DOUBLE); + } + } + + throw new UnsupportedOperationException("Unsupported shredded value type: " + primitive); + } + + @Override + public ParquetValueWriter value( + GroupType value, ParquetValueWriter valueWriter, ParquetValueWriter typedWriter) { + int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); + if (typedWriter != null) { + int typedValueDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)); + return ParquetVariantWriters.shredded(valueDL, valueWriter, typedValueDL, typedWriter); + } else if (value.getType(VALUE).isRepetition(Type.Repetition.OPTIONAL)) { + return ParquetValueWriters.option(value.getType(VALUE), valueDL, valueWriter); + } else { + return valueWriter; + } + } + + @Override + public ParquetValueWriter object( + GroupType object, + ParquetValueWriter valueWriter, + List> fieldWriters) { + int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); + int fieldsDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)); + + List names = + object.getType(TYPED_VALUE).asGroupType().getFields().stream() + .map(Type::getName) + .collect(Collectors.toList()); + + return ParquetVariantWriters.objects(valueDL, valueWriter, fieldsDL, names, fieldWriters); + } + + @Override + public ParquetValueWriter array( + GroupType array, ParquetValueWriter valueWriter, ParquetValueWriter elementWriter) { + throw new UnsupportedOperationException("Array is not yet supported"); + } + + private static class LogicalTypeToVariantWriter + implements LogicalTypeAnnotationVisitor> { + private final ColumnDescriptor desc; + + private LogicalTypeToVariantWriter(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit(StringLogicalTypeAnnotation ignored) { + ParquetValueWriter writer = + ParquetVariantWriters.primitive(ParquetValueWriters.strings(desc), PhysicalType.STRING); + return Optional.of(writer); + } + + @Override + public Optional> visit(DecimalLogicalTypeAnnotation decimal) { + ParquetValueWriter writer; + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.decimalAsFixed( + desc, decimal.getPrecision(), decimal.getScale()), + PhysicalType.DECIMAL16); + return Optional.of(writer); + case INT64: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.decimalAsLong( + desc, decimal.getPrecision(), decimal.getScale()), + PhysicalType.DECIMAL8); + return Optional.of(writer); + case INT32: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.decimalAsInteger( + desc, decimal.getPrecision(), decimal.getScale()), + PhysicalType.DECIMAL4); + return Optional.of(writer); + } + + throw new IllegalArgumentException( + "Invalid primitive type for decimal: " + desc.getPrimitiveType()); + } + + @Override + public Optional> visit(DateLogicalTypeAnnotation ignored) { + ParquetValueWriter writer = + ParquetVariantWriters.primitive(ParquetValueWriters.ints(desc), PhysicalType.DATE); + return Optional.of(writer); + } + + @Override + public Optional> visit(TimeLogicalTypeAnnotation time) { + // ParquetValueWriter writer = + // ParquetVariantWriters.primitive(ParquetValueWriters.longs(desc), PhysicalType.TIME); + return Optional.empty(); + } + + @Override + public Optional> visit(TimestampLogicalTypeAnnotation timestamp) { + if (timestamp.getUnit() == TimeUnit.MICROS) { + PhysicalType type = + timestamp.isAdjustedToUTC() ? PhysicalType.TIMESTAMPTZ : PhysicalType.TIMESTAMPNTZ; + ParquetValueWriter writer = + ParquetVariantWriters.primitive(ParquetValueWriters.longs(desc), type); + return Optional.of(writer); + } + + throw new IllegalArgumentException( + "Invalid unit for shredded timestamp: " + timestamp.getUnit()); + } + + @Override + public Optional> visit(IntLogicalTypeAnnotation logical) { + Preconditions.checkArgument( + logical.isSigned(), "Invalid logical type for variant, unsigned: %s", logical); + ParquetValueWriter writer; + switch (logical.getBitWidth()) { + case 8: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.tinyints(desc), PhysicalType.INT8); + return Optional.of(writer); + case 16: + writer = + ParquetVariantWriters.primitive(ParquetValueWriters.shorts(desc), PhysicalType.INT16); + return Optional.of(writer); + case 32: + writer = + ParquetVariantWriters.primitive(ParquetValueWriters.ints(desc), PhysicalType.INT32); + return Optional.of(writer); + case 64: + writer = + ParquetVariantWriters.primitive(ParquetValueWriters.longs(desc), PhysicalType.INT64); + return Optional.of(writer); + } + + throw new IllegalArgumentException("Invalid bit width for int: " + logical.getBitWidth()); + } + + @Override + public Optional> visit(UUIDLogicalTypeAnnotation uuidLogicalType) { + // ParquetValueWriter writer = + // ParquetVariantWriters.primitive(ParquetValueWriters.uuids(desc), PhysicalType.UUID); + return Optional.empty(); + } + } +} From d273904d3502b6d3c71d8d71cbf509bede159307 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 20 Feb 2025 15:20:14 -0800 Subject: [PATCH 2/7] Add tests for Parquet variant writers. --- .../iceberg/variants/ShreddedObject.java | 4 +- .../iceberg/variants/VariantVisitor.java | 83 ++++ .../org/apache/iceberg/variants/Variants.java | 10 + .../apache/iceberg/InternalTestHelpers.java | 8 + .../iceberg/data/parquet/InternalWriter.java | 10 +- .../org/apache/iceberg/parquet/Parquet.java | 17 +- .../iceberg/parquet/ParquetSchemaUtil.java | 6 + .../iceberg/parquet/ParquetValueWriters.java | 4 + .../parquet/ParquetVariantWriters.java | 15 +- .../apache/iceberg/parquet/ParquetWriter.java | 9 +- .../iceberg/parquet/TypeToMessageType.java | 57 ++- .../iceberg/parquet/VariantWriterBuilder.java | 19 +- .../iceberg/parquet/TestVariantWriters.java | 361 ++++++++++++++++++ 13 files changed, 575 insertions(+), 28 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java create mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java index 1a22a652b0d9..809de747db7f 100644 --- a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java +++ b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java @@ -46,11 +46,11 @@ public class ShreddedObject implements VariantObject { private SerializationState serializationState = null; ShreddedObject(VariantMetadata metadata) { - this.metadata = metadata; - this.unshredded = null; + this(metadata, null); } ShreddedObject(VariantMetadata metadata, VariantObject unshredded) { + Preconditions.checkArgument(metadata != null, "Invalid metadata: null"); this.metadata = metadata; this.unshredded = unshredded; } diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java b/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java new file mode 100644 index 000000000000..32e285d3f266 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class VariantVisitor { + public R object(VariantObject object, List fieldResults) { + return null; + } + + public R array(VariantArray array, List elementResults) { + return null; + } + + public R primitive(VariantPrimitive primitive) { + return null; + } + + public void beforeArrayElement(int index) {} + + public void afterArrayElement(int index) {} + + public void beforeObjectField(String fieldName) {} + + public void afterObjectField(String fieldName) {} + + public static R visit(Variant variant, VariantVisitor visitor) { + return visit(variant.value(), visitor); + } + + public static R visit(VariantValue value, VariantVisitor visitor) { + switch (value.type()) { + case ARRAY: + VariantArray array = value.asArray(); + List elementResults = Lists.newArrayList(); + for (int index = 0; index < array.numElements(); index += 1) { + visitor.beforeArrayElement(index); + try { + elementResults.add(visit(array.get(index), visitor)); + } finally { + visitor.afterArrayElement(index); + } + } + + return visitor.array(array, elementResults); + + case OBJECT: + VariantObject object = value.asObject(); + List fieldResults = Lists.newArrayList(); + for (String fieldName : object.fieldNames()) { + visitor.beforeObjectField(fieldName); + try { + fieldResults.add(visit(object.get(fieldName), visitor)); + } finally { + visitor.afterObjectField(fieldName); + } + } + + return visitor.object(object, fieldResults); + + default: + return visitor.primitive(value.asPrimitive()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java index de965b6f9044..886deeb8fb31 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variants.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -45,6 +45,16 @@ public static ShreddedObject object(VariantMetadata metadata) { return new ShreddedObject(metadata); } + public static ShreddedObject object(VariantObject object) { + if (object instanceof ShreddedObject) { + return new ShreddedObject(((ShreddedObject) object).metadata(), object); + } else if (object instanceof SerializedObject) { + return new ShreddedObject(((SerializedObject) object).metadata(), object); + } + + throw new UnsupportedOperationException("Metadata is required for object: " + object); + } + public static VariantPrimitive of(PhysicalType type, T value) { return new PrimitiveWrapper<>(type, value); } diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java index 3894298b7fc4..781051f11d7b 100644 --- a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -25,6 +25,8 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantTestUtil; public class InternalTestHelpers { @@ -106,6 +108,12 @@ private static void assertEquals(Type type, Object expected, Object actual) { assertThat(actual).as("Actual should be a Map").isInstanceOf(Map.class); assertEquals(type.asMapType(), (Map) expected, (Map) actual); break; + case VARIANT: + assertThat(expected).as("Expected should be a Variant").isInstanceOf(Variant.class); + assertThat(actual).as("Actual should be a Variant").isInstanceOf(Variant.class); + VariantTestUtil.assertEqual(((Variant) expected).metadata(), ((Variant) actual).metadata()); + VariantTestUtil.assertEqual(((Variant) expected).value(), ((Variant) actual).value()); + break; default: throw new IllegalArgumentException("Not a supported type: " + type); } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java index b42f07ce18ce..79e4989efbeb 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java @@ -24,6 +24,7 @@ import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; @@ -38,9 +39,14 @@ public class InternalWriter extends BaseParquetWriter { private InternalWriter() {} - @SuppressWarnings("unchecked") public static ParquetValueWriter create(MessageType type) { - return (ParquetValueWriter) INSTANCE.createWriter(type); + return create(null, type); + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter create( + Types.StructType struct, MessageType type) { + return (ParquetValueWriter) INSTANCE.createWriter(struct, type); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index ae25305629ef..8bd9dba9d3a2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -120,6 +120,7 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,9 +161,10 @@ public static class WriteBuilder implements InternalData.WriteBuilder { private final Map metadata = Maps.newLinkedHashMap(); private final Map config = Maps.newLinkedHashMap(); private Schema schema = null; + private BiFunction variantShreddingFunc = null; private String name = "table"; private WriteSupport writeSupport = null; - private Function> createWriterFunc = null; + private BiFunction> createWriterFunc = null; private MetricsConfig metricsConfig = MetricsConfig.getDefault(); private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE; private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; @@ -192,6 +194,11 @@ public WriteBuilder schema(Schema newSchema) { return this; } + public WriteBuilder variantShreddingFunc(BiFunction func) { + this.variantShreddingFunc = func; + return this; + } + @Override public WriteBuilder named(String newName) { this.name = newName; @@ -222,7 +229,9 @@ public WriteBuilder meta(String property, String value) { public WriteBuilder createWriterFunc( Function> newCreateWriterFunc) { - this.createWriterFunc = newCreateWriterFunc; + if (newCreateWriterFunc != null) { + this.createWriterFunc = (icebergSchema, type) -> newCreateWriterFunc.apply(type); + } return this; } @@ -292,6 +301,7 @@ private void setBloomFilterConfig( Map fieldIdToParquetPath = parquetSchema.getColumns().stream() + .filter(col -> col.getPrimitiveType().getId() != null) .collect( Collectors.toMap( col -> col.getPrimitiveType().getId().intValue(), @@ -362,7 +372,7 @@ public FileAppender build() throws IOException { } set("parquet.avro.write-old-list-structure", "false"); - MessageType type = ParquetSchemaUtil.convert(schema, name); + MessageType type = ParquetSchemaUtil.convert(schema, name, variantShreddingFunc); FileEncryptionProperties fileEncryptionProperties = null; if (fileEncryptionKey != null) { @@ -406,6 +416,7 @@ public FileAppender build() throws IOException { conf, file, schema, + type, rowGroupSize, metadata, createWriterFunc, diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java index 68a9aa979fdf..31a137cf85c7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.mapping.NameMapping; @@ -42,6 +43,11 @@ public static MessageType convert(Schema schema, String name) { return new TypeToMessageType().convert(schema, name); } + public static MessageType convert( + Schema schema, String name, BiFunction variantShreddingFunc) { + return new TypeToMessageType(variantShreddingFunc).convert(schema, name); + } + /** * Converts a Parquet schema to an Iceberg schema. Fields without IDs are kept and assigned * fallback IDs. diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 1a7ebe0767d8..f6b729f9f25f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -71,6 +71,10 @@ public static UnboxedWriter shorts(ColumnDescriptor desc) { return new ShortWriter(desc); } + public static ParquetValueWriter unboxed(ColumnDescriptor desc) { + return new UnboxedWriter<>(desc); + } + public static UnboxedWriter ints(ColumnDescriptor desc) { return new UnboxedWriter<>(desc); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java index 366215a750c0..330b349b3003 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java @@ -81,6 +81,7 @@ static ParquetValueWriter objects( int valueDefinitionLevel, ParquetValueWriter valueWriter, int typedDefinitionLevel, + int fieldDefinitionLevel, List fieldNames, List> fieldWriters) { ImmutableMap.Builder> builder = ImmutableMap.builder(); @@ -92,6 +93,7 @@ static ParquetValueWriter objects( valueDefinitionLevel, (ParquetValueWriter) valueWriter, typedDefinitionLevel, + fieldDefinitionLevel, builder.build()); } @@ -128,7 +130,7 @@ public void setColumnStore(ColumnWriteStore columnStore) { private abstract static class VariantBinaryWriter implements ParquetValueWriter { private final ParquetValueWriter bytesWriter; - private ByteBuffer reusedBuffer = ByteBuffer.allocate(2048); + private ByteBuffer reusedBuffer = ByteBuffer.allocate(2048).order(ByteOrder.LITTLE_ENDIAN); private VariantBinaryWriter(ParquetValueWriter bytesWriter) { this.bytesWriter = bytesWriter; @@ -157,6 +159,8 @@ private void ensureCapacity(int requiredSize) { if (reusedBuffer.capacity() < requiredSize) { int newCapacity = capacityFor(requiredSize); this.reusedBuffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN); + } else { + reusedBuffer.limit(requiredSize); } } @@ -282,6 +286,7 @@ private static class ShreddedObjectWriter implements ParquetValueWriter valueWriter; private final int typedDefinitionLevel; + private final int fieldDefinitionLevel; private final Map> typedWriters; private final List> children; @@ -289,10 +294,12 @@ private ShreddedObjectWriter( int valueDefinitionLevel, ParquetValueWriter valueWriter, int typedDefinitionLevel, + int fieldDefinitionLevel, Map> typedWriters) { this.valueDefinitionLevel = valueDefinitionLevel; this.valueWriter = valueWriter; this.typedDefinitionLevel = typedDefinitionLevel; + this.fieldDefinitionLevel = fieldDefinitionLevel; this.typedWriters = typedWriters; this.children = children( @@ -305,14 +312,14 @@ public void write(int repetitionLevel, VariantValue value) { if (value.type() != PhysicalType.OBJECT) { valueWriter.write(repetitionLevel, value); - // write null for all fields + // write null for the typed_value group for (ParquetValueWriter writer : typedWriters.values()) { writeNull(writer, repetitionLevel, typedDefinitionLevel); } } else { VariantObject object = value.asObject(); - ShreddedObject shredded = Variants.object(null, object); + ShreddedObject shredded = Variants.object(object); for (Map.Entry> entry : typedWriters.entrySet()) { String fieldName = entry.getKey(); ParquetValueWriter writer = entry.getValue(); @@ -324,7 +331,7 @@ public void write(int repetitionLevel, VariantValue value) { writer.write(repetitionLevel, fieldValue); } else { // missing: write null to both value and typed_value - writeNull(writer, repetitionLevel, typedDefinitionLevel); + writeNull(writer, repetitionLevel, fieldDefinitionLevel); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 099cffc33bb8..b808734744a9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -23,7 +23,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; -import java.util.function.Function; +import java.util.function.BiFunction; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; @@ -75,9 +75,10 @@ class ParquetWriter implements FileAppender, Closeable { Configuration conf, OutputFile output, Schema schema, + MessageType parquetSchema, long rowGroupSize, Map metadata, - Function> createWriterFunc, + BiFunction> createWriterFunc, CompressionCodecName codec, ParquetProperties properties, MetricsConfig metricsConfig, @@ -88,8 +89,8 @@ class ParquetWriter implements FileAppender, Closeable { this.metadata = ImmutableMap.copyOf(metadata); this.compressor = new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); - this.parquetSchema = ParquetSchemaUtil.convert(schema, "table"); - this.model = (ParquetValueWriter) createWriterFunc.apply(parquetSchema); + this.parquetSchema = parquetSchema; + this.model = (ParquetValueWriter) createWriterFunc.apply(schema, parquetSchema); this.metricsConfig = metricsConfig; this.columnIndexTruncateLength = conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index cb40693e24a4..1a65c26c0f54 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -26,8 +26,10 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import java.util.function.BiFunction; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type.NestedType; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; @@ -56,6 +58,19 @@ public class TypeToMessageType { LogicalTypeAnnotation.timestampType(false /* not adjusted to UTC */, TimeUnit.MICROS); private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = LogicalTypeAnnotation.timestampType(true /* adjusted to UTC */, TimeUnit.MICROS); + private static final String METADATA = "metadata"; + private static final String VALUE = "value"; + private static final String TYPED_VALUE = "typed_value"; + + private final BiFunction variantShreddingFunc; + + public TypeToMessageType() { + this.variantShreddingFunc = null; + } + + public TypeToMessageType(BiFunction variantShreddingFunc) { + this.variantShreddingFunc = variantShreddingFunc; + } public MessageType convert(Schema schema, String name) { Types.MessageTypeBuilder builder = Types.buildMessage(); @@ -122,13 +137,41 @@ public GroupType map(MapType map, Type.Repetition repetition, int id, String nam public Type variant(Type.Repetition repetition, int id, String originalName) { String name = AvroSchemaUtil.makeCompatibleName(originalName); - return Types.buildGroup(repetition) - .id(id) - .required(BINARY) - .named("metadata") - .required(BINARY) - .named("value") - .named(name); + Type shreddedType; + if (variantShreddingFunc != null) { + shreddedType = variantShreddingFunc.apply(id, originalName); + } else { + shreddedType = null; + } + + if (shreddedType != null) { + Preconditions.checkArgument( + shreddedType.getName().equals(TYPED_VALUE), + "Invalid shredded type name: %s should be typed_value", + shreddedType.getName()); + Preconditions.checkArgument( + shreddedType.isRepetition(Type.Repetition.OPTIONAL), + "Invalid shredded type repetition: %s should be OPTIONAL", + shreddedType.getRepetition()); + + return Types.buildGroup(repetition) + .id(id) + .required(BINARY) + .named(METADATA) + .optional(BINARY) + .named(VALUE) + .addField(shreddedType) + .named(name); + + } else { + return Types.buildGroup(repetition) + .id(id) + .required(BINARY) + .named(METADATA) + .required(BINARY) + .named(VALUE) + .named(name); + } } public Type primitive( diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java index 14668f6c84bc..afe2907a4091 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java @@ -68,8 +68,8 @@ private String[] currentPath() { return Streams.concat(Streams.stream(basePath), fieldNames.stream()).toArray(String[]::new); } - private String[] path(String name) { - return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(name)) + private String[] path(String... names) { + return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(names)) .toArray(String[]::new); } @@ -119,11 +119,13 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { return ParquetVariantWriters.primitive( ParquetValueWriters.longs(desc), PhysicalType.INT64); case FLOAT: + // use an unboxed writer to skip metrics collection that requires an ID return ParquetVariantWriters.primitive( - ParquetValueWriters.floats(desc), PhysicalType.FLOAT); + ParquetValueWriters.unboxed(desc), PhysicalType.FLOAT); case DOUBLE: + // use an unboxed writer to skip metrics collection that requires an ID return ParquetVariantWriters.primitive( - ParquetValueWriters.doubles(desc), PhysicalType.DOUBLE); + ParquetValueWriters.unboxed(desc), PhysicalType.DOUBLE); } } @@ -150,14 +152,19 @@ public ParquetValueWriter object( ParquetValueWriter valueWriter, List> fieldWriters) { int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); - int fieldsDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)); + int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)); + GroupType firstField = object.getType(TYPED_VALUE).asGroupType().getType(0).asGroupType(); + int fieldDL = + schema.getMaxDefinitionLevel( + path(TYPED_VALUE, firstField.getName(), firstField.getType(0).getName())); List names = object.getType(TYPED_VALUE).asGroupType().getFields().stream() .map(Type::getName) .collect(Collectors.toList()); - return ParquetVariantWriters.objects(valueDL, valueWriter, fieldsDL, names, fieldWriters); + return ParquetVariantWriters.objects( + valueDL, valueWriter, typedDL, fieldDL, names, fieldWriters); } @Override diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java new file mode 100644 index 000000000000..1ae92b3ff1bd --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantArray; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantPrimitive; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.VariantVisitor; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types.GroupBuilder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; + +public class TestVariantWriters { + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "var", Types.VariantType.get())); + + private static final GenericRecord RECORD = GenericRecord.create(SCHEMA); + + private static final ByteBuffer TEST_METADATA_BUFFER = + VariantTestUtil.createMetadata(ImmutableList.of("a", "b", "c", "d", "e"), true); + private static final ByteBuffer TEST_OBJECT_BUFFER = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, + ImmutableMap.of( + "a", Variants.ofNull(), + "d", Variants.of("iceberg"))); + private static final ByteBuffer SIMILAR_OBJECT_BUFFER = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, + ImmutableMap.of( + "a", Variants.of(123456789), + "c", Variants.of("string"))); + private static final ByteBuffer EMPTY_OBJECT_BUFFER = + VariantTestUtil.createObject(TEST_METADATA_BUFFER, ImmutableMap.of()); + + private static final VariantMetadata EMPTY_METADATA = + Variants.metadata(VariantTestUtil.emptyMetadata()); + private static final VariantMetadata TEST_METADATA = Variants.metadata(TEST_METADATA_BUFFER); + private static final VariantObject TEST_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, TEST_OBJECT_BUFFER); + private static final VariantObject SIMILAR_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, SIMILAR_OBJECT_BUFFER); + private static final VariantObject EMPTY_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, EMPTY_OBJECT_BUFFER); + + private static final Variant[] VARIANTS = + new Variant[] { + variantOf(EMPTY_METADATA, Variants.ofNull()), + variantOf(EMPTY_METADATA, Variants.of(true)), + variantOf(EMPTY_METADATA, Variants.of(false)), + variantOf(EMPTY_METADATA, Variants.of((byte) 34)), + variantOf(EMPTY_METADATA, Variants.of((byte) -34)), + variantOf(EMPTY_METADATA, Variants.of((short) 1234)), + variantOf(EMPTY_METADATA, Variants.of((short) -1234)), + variantOf(EMPTY_METADATA, Variants.of(12345)), + variantOf(EMPTY_METADATA, Variants.of(-12345)), + variantOf(EMPTY_METADATA, Variants.of(9876543210L)), + variantOf(EMPTY_METADATA, Variants.of(-9876543210L)), + variantOf(EMPTY_METADATA, Variants.of(10.11F)), + variantOf(EMPTY_METADATA, Variants.of(-10.11F)), + variantOf(EMPTY_METADATA, Variants.of(14.3D)), + variantOf(EMPTY_METADATA, Variants.of(-14.3D)), + variantOf(EMPTY_METADATA, EMPTY_OBJECT), + variantOf(TEST_METADATA, TEST_OBJECT), + variantOf(TEST_METADATA, SIMILAR_OBJECT), + variantOf(EMPTY_METADATA, Variants.ofIsoDate("2024-11-07")), + variantOf(EMPTY_METADATA, Variants.ofIsoDate("1957-11-07")), + variantOf(EMPTY_METADATA, Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00")), + variantOf(EMPTY_METADATA, Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00")), + variantOf(EMPTY_METADATA, Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456")), + variantOf(EMPTY_METADATA, Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456")), + variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("123456.789"))), // decimal4 + variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("-123456.789"))), // decimal4 + variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("123456789.987654321"))), // decimal8 + variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("-123456789.987654321"))), // decimal8 + variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("9876543210.123456789"))), // decimal16 + variantOf( + EMPTY_METADATA, Variants.of(new BigDecimal("-9876543210.123456789"))), // decimal16 + variantOf( + EMPTY_METADATA, Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d}))), + variantOf(EMPTY_METADATA, Variants.of("iceberg")), + }; + + @ParameterizedTest + @FieldSource("VARIANTS") + public void testUnshreddedValues(Variant variant) throws IOException { + Record record = RECORD.copy("id", 1, "var", variant); + + Record actual = writeAndRead((id, name) -> null, record); + + InternalTestHelpers.assertEquals(SCHEMA.asStruct(), record, actual); + } + + @ParameterizedTest + @FieldSource("VARIANTS") + public void testShreddedValues(Variant variant) throws IOException { + Record record = RECORD.copy("id", 1, "var", variant); + + Record actual = writeAndRead((id, name) -> toParquetSchema(variant.value()), record); + + InternalTestHelpers.assertEquals(SCHEMA.asStruct(), record, actual); + } + + @ParameterizedTest + @FieldSource("VARIANTS") + public void testMixedShredding(Variant variant) throws IOException { + List expected = + IntStream.range(0, VARIANTS.length) + .mapToObj(i -> RECORD.copy("id", i, "var", VARIANTS[i])) + .collect(Collectors.toList()); + + List actual = writeAndRead((id, name) -> toParquetSchema(variant.value()), expected); + + assertThat(actual.size()).isEqualTo(expected.size()); + + for (int i = 0; i < expected.size(); i += 1) { + InternalTestHelpers.assertEquals(SCHEMA.asStruct(), expected.get(i), actual.get(i)); + } + } + + private static Record writeAndRead(BiFunction shreddingFunc, Record record) + throws IOException { + return Iterables.getOnlyElement(writeAndRead(shreddingFunc, List.of(record))); + } + + private static List writeAndRead( + BiFunction shreddingFunc, List records) throws IOException { + OutputFile outputFile = new InMemoryOutputFile(); + + try (FileAppender writer = + Parquet.write(outputFile) + .schema(SCHEMA) + .variantShreddingFunc(shreddingFunc) + .createWriterFunc(fileSchema -> InternalWriter.create(SCHEMA.asStruct(), fileSchema)) + .build()) { + for (Record record : records) { + writer.add(record); + } + } + + try (CloseableIterable reader = + Parquet.read(outputFile.toInputFile()) + .project(SCHEMA) + .createReaderFunc(fileSchema -> InternalReader.create(SCHEMA, fileSchema)) + .build()) { + return Lists.newArrayList(reader); + } + } + + private static Variant variantOf(VariantMetadata metadata, VariantValue value) { + return new Variant() { + @Override + public VariantMetadata metadata() { + return metadata; + } + + @Override + public VariantValue value() { + return value; + } + + @Override + public String toString() { + return "Variant(metadata=" + metadata + ", value=" + value + ")"; + } + }; + } + + private Type toParquetSchema(VariantValue value) { + return VariantVisitor.visit(value, new ParquetSchemaProducer()); + } + + private static class ParquetSchemaProducer extends VariantVisitor { + @Override + public Type object(VariantObject object, List typedValues) { + if (object.numFields() < 1) { + // Parquet cannot write typed_value group with no fields + return null; + } + + List fields = Lists.newArrayList(); + int index = 0; + for (String name : object.fieldNames()) { + Type typedValue = typedValues.get(index); + fields.add(field(name, typedValue)); + index += 1; + } + + return objectFields(fields); + } + + @Override + public Type array(VariantArray array, List elementResults) { + throw null; + } + + @Override + public Type primitive(VariantPrimitive primitive) { + switch (primitive.type()) { + case NULL: + return null; + case BOOLEAN_TRUE: + case BOOLEAN_FALSE: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.BOOLEAN); + case INT8: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8)); + case INT16: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(16)); + case INT32: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.INT32); + case INT64: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.INT64); + case FLOAT: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.FLOAT); + case DOUBLE: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.DOUBLE); + case DECIMAL4: + BigDecimal decimal4 = (BigDecimal) primitive.get(); + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.decimalType(decimal4.scale(), 9)); + case DECIMAL8: + BigDecimal decimal8 = (BigDecimal) primitive.get(); + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.decimalType(decimal8.scale(), 18)); + case DECIMAL16: + BigDecimal decimal16 = (BigDecimal) primitive.get(); + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.decimalType(decimal16.scale(), 38)); + case DATE: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.dateType()); + case TIMESTAMPTZ: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)); + case TIMESTAMPNTZ: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)); + case BINARY: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.BINARY); + case STRING: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType()); + } + + throw new UnsupportedOperationException("Unsupported shredding type: " + primitive.type()); + } + + private static GroupType objectFields(List fields) { + GroupBuilder builder = + org.apache.parquet.schema.Types.buildGroup(Type.Repetition.OPTIONAL); + for (GroupType field : fields) { + checkField(field); + builder.addField(field); + } + + return builder.named("typed_value"); + } + + private static void checkField(GroupType fieldType) { + Preconditions.checkArgument( + fieldType.isRepetition(Type.Repetition.REQUIRED), + "Invalid field type repetition: %s should be REQUIRED", + fieldType.getRepetition()); + } + + private static GroupType field(String name, Type shreddedType) { + GroupBuilder builder = + org.apache.parquet.schema.Types.buildGroup(Type.Repetition.REQUIRED) + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .named("value"); + + if (shreddedType != null) { + checkShreddedType(shreddedType); + builder.addField(shreddedType); + } + + return builder.named(name); + } + + private static void checkShreddedType(Type shreddedType) { + Preconditions.checkArgument( + shreddedType.getName().equals("typed_value"), + "Invalid shredded type name: %s should be typed_value", + shreddedType.getName()); + Preconditions.checkArgument( + shreddedType.isRepetition(Type.Repetition.OPTIONAL), + "Invalid shredded type repetition: %s should be OPTIONAL", + shreddedType.getRepetition()); + } + + private static Type shreddedPrimitive(PrimitiveType.PrimitiveTypeName primitive) { + return org.apache.parquet.schema.Types.optional(primitive).named("typed_value"); + } + + private static Type shreddedPrimitive( + PrimitiveType.PrimitiveTypeName primitive, LogicalTypeAnnotation annotation) { + return org.apache.parquet.schema.Types.optional(primitive) + .as(annotation) + .named("typed_value"); + } + } +} From 7feb172a6ddf21f3aaf8eb369d7de1fcf4226a94 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 20 Feb 2025 16:18:43 -0800 Subject: [PATCH 3/7] Add VariantData implementation of Variant. --- .../org/apache/iceberg/variants/Variant.java | 12 +-- .../apache/iceberg/variants/VariantData.java | 43 ++++++++++ .../parquet/ParquetVariantReaders.java | 26 +----- .../iceberg/parquet/TestVariantWriters.java | 84 ++++++++----------- 4 files changed, 78 insertions(+), 87 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/variants/VariantData.java diff --git a/api/src/main/java/org/apache/iceberg/variants/Variant.java b/api/src/main/java/org/apache/iceberg/variants/Variant.java index 4376938e9e15..4ea462f7256d 100644 --- a/api/src/main/java/org/apache/iceberg/variants/Variant.java +++ b/api/src/main/java/org/apache/iceberg/variants/Variant.java @@ -29,17 +29,7 @@ public interface Variant { VariantValue value(); static Variant of(VariantMetadata metadata, VariantValue value) { - return new Variant() { - @Override - public VariantMetadata metadata() { - return metadata; - } - - @Override - public VariantValue value() { - return value; - } - }; + return new VariantData(metadata, value); } static Variant from(ByteBuffer buffer) { diff --git a/api/src/main/java/org/apache/iceberg/variants/VariantData.java b/api/src/main/java/org/apache/iceberg/variants/VariantData.java new file mode 100644 index 000000000000..20d8464c5163 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/variants/VariantData.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class VariantData implements Variant { + private final VariantMetadata metadata; + private final VariantValue value; + + VariantData(VariantMetadata metadata, VariantValue value) { + Preconditions.checkArgument(metadata != null, "Invalid variant metadata: null"); + Preconditions.checkArgument(value != null, "Invalid variant value: null"); + this.metadata = metadata; + this.value = value; + } + + @Override + public VariantMetadata metadata() { + return metadata; + } + + @Override + public VariantValue value() { + return value; + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index b0e132afad5f..3e5635958c0a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -351,31 +351,7 @@ private VariantReader( public Variant read(Variant ignored) { VariantMetadata metadata = metadataReader.read(null); VariantValue value = valueReader.read(metadata); - if (value == MISSING) { - return new Variant() { - @Override - public VariantMetadata metadata() { - return metadata; - } - - @Override - public VariantValue value() { - return Variants.ofNull(); - } - }; - } - - return new Variant() { - @Override - public VariantMetadata metadata() { - return metadata; - } - - @Override - public VariantValue value() { - return value; - } - }; + return Variant.of(metadata, value != MISSING ? value : Variants.ofNull()); } @Override diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java index 1ae92b3ff1bd..3ad574ce787d 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java @@ -97,40 +97,41 @@ public class TestVariantWriters { private static final Variant[] VARIANTS = new Variant[] { - variantOf(EMPTY_METADATA, Variants.ofNull()), - variantOf(EMPTY_METADATA, Variants.of(true)), - variantOf(EMPTY_METADATA, Variants.of(false)), - variantOf(EMPTY_METADATA, Variants.of((byte) 34)), - variantOf(EMPTY_METADATA, Variants.of((byte) -34)), - variantOf(EMPTY_METADATA, Variants.of((short) 1234)), - variantOf(EMPTY_METADATA, Variants.of((short) -1234)), - variantOf(EMPTY_METADATA, Variants.of(12345)), - variantOf(EMPTY_METADATA, Variants.of(-12345)), - variantOf(EMPTY_METADATA, Variants.of(9876543210L)), - variantOf(EMPTY_METADATA, Variants.of(-9876543210L)), - variantOf(EMPTY_METADATA, Variants.of(10.11F)), - variantOf(EMPTY_METADATA, Variants.of(-10.11F)), - variantOf(EMPTY_METADATA, Variants.of(14.3D)), - variantOf(EMPTY_METADATA, Variants.of(-14.3D)), - variantOf(EMPTY_METADATA, EMPTY_OBJECT), - variantOf(TEST_METADATA, TEST_OBJECT), - variantOf(TEST_METADATA, SIMILAR_OBJECT), - variantOf(EMPTY_METADATA, Variants.ofIsoDate("2024-11-07")), - variantOf(EMPTY_METADATA, Variants.ofIsoDate("1957-11-07")), - variantOf(EMPTY_METADATA, Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00")), - variantOf(EMPTY_METADATA, Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00")), - variantOf(EMPTY_METADATA, Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456")), - variantOf(EMPTY_METADATA, Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456")), - variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("123456.789"))), // decimal4 - variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("-123456.789"))), // decimal4 - variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("123456789.987654321"))), // decimal8 - variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("-123456789.987654321"))), // decimal8 - variantOf(EMPTY_METADATA, Variants.of(new BigDecimal("9876543210.123456789"))), // decimal16 - variantOf( + Variant.of(EMPTY_METADATA, Variants.ofNull()), + Variant.of(EMPTY_METADATA, Variants.of(true)), + Variant.of(EMPTY_METADATA, Variants.of(false)), + Variant.of(EMPTY_METADATA, Variants.of((byte) 34)), + Variant.of(EMPTY_METADATA, Variants.of((byte) -34)), + Variant.of(EMPTY_METADATA, Variants.of((short) 1234)), + Variant.of(EMPTY_METADATA, Variants.of((short) -1234)), + Variant.of(EMPTY_METADATA, Variants.of(12345)), + Variant.of(EMPTY_METADATA, Variants.of(-12345)), + Variant.of(EMPTY_METADATA, Variants.of(9876543210L)), + Variant.of(EMPTY_METADATA, Variants.of(-9876543210L)), + Variant.of(EMPTY_METADATA, Variants.of(10.11F)), + Variant.of(EMPTY_METADATA, Variants.of(-10.11F)), + Variant.of(EMPTY_METADATA, Variants.of(14.3D)), + Variant.of(EMPTY_METADATA, Variants.of(-14.3D)), + Variant.of(EMPTY_METADATA, EMPTY_OBJECT), + Variant.of(TEST_METADATA, TEST_OBJECT), + Variant.of(TEST_METADATA, SIMILAR_OBJECT), + Variant.of(EMPTY_METADATA, Variants.ofIsoDate("2024-11-07")), + Variant.of(EMPTY_METADATA, Variants.ofIsoDate("1957-11-07")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456")), + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("123456.789"))), // decimal4 + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("-123456.789"))), // decimal4 + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("123456789.987654321"))), // decimal8 + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("-123456789.987654321"))), // decimal8 + Variant.of( + EMPTY_METADATA, Variants.of(new BigDecimal("9876543210.123456789"))), // decimal16 + Variant.of( EMPTY_METADATA, Variants.of(new BigDecimal("-9876543210.123456789"))), // decimal16 - variantOf( + Variant.of( EMPTY_METADATA, Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d}))), - variantOf(EMPTY_METADATA, Variants.of("iceberg")), + Variant.of(EMPTY_METADATA, Variants.of("iceberg")), }; @ParameterizedTest @@ -199,25 +200,6 @@ private static List writeAndRead( } } - private static Variant variantOf(VariantMetadata metadata, VariantValue value) { - return new Variant() { - @Override - public VariantMetadata metadata() { - return metadata; - } - - @Override - public VariantValue value() { - return value; - } - - @Override - public String toString() { - return "Variant(metadata=" + metadata + ", value=" + value + ")"; - } - }; - } - private Type toParquetSchema(VariantValue value) { return VariantVisitor.visit(value, new ParquetSchemaProducer()); } From 03ce5d84177e442a1112ba251c3637c4a9904600 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 28 Feb 2025 18:06:31 -0800 Subject: [PATCH 4/7] Pass field names to object method. --- .../java/org/apache/iceberg/variants/VariantVisitor.java | 6 ++++-- .../java/org/apache/iceberg/parquet/TestVariantWriters.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java b/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java index 32e285d3f266..3bee300f20a9 100644 --- a/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java +++ b/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java @@ -22,7 +22,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class VariantVisitor { - public R object(VariantObject object, List fieldResults) { + public R object(VariantObject object, List fieldNames, List fieldResults) { return null; } @@ -64,8 +64,10 @@ public static R visit(VariantValue value, VariantVisitor visitor) { case OBJECT: VariantObject object = value.asObject(); + List fieldNames = Lists.newArrayList(); List fieldResults = Lists.newArrayList(); for (String fieldName : object.fieldNames()) { + fieldNames.add(fieldName); visitor.beforeObjectField(fieldName); try { fieldResults.add(visit(object.get(fieldName), visitor)); @@ -74,7 +76,7 @@ public static R visit(VariantValue value, VariantVisitor visitor) { } } - return visitor.object(object, fieldResults); + return visitor.object(object, fieldNames, fieldResults); default: return visitor.primitive(value.asPrimitive()); diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java index 3ad574ce787d..b3bada2ca237 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java @@ -206,7 +206,7 @@ private Type toParquetSchema(VariantValue value) { private static class ParquetSchemaProducer extends VariantVisitor { @Override - public Type object(VariantObject object, List typedValues) { + public Type object(VariantObject object, List names, List typedValues) { if (object.numFields() < 1) { // Parquet cannot write typed_value group with no fields return null; @@ -214,7 +214,7 @@ public Type object(VariantObject object, List typedValues) { List fields = Lists.newArrayList(); int index = 0; - for (String name : object.fieldNames()) { + for (String name : names) { Type typedValue = typedValues.get(index); fields.add(field(name, typedValue)); index += 1; From e69ec6a56814442e59de6df8efcd1a1a1ca40df2 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 4 Mar 2025 13:32:04 -0800 Subject: [PATCH 5/7] Parquet: Document the variant shredding function. --- .../org/apache/iceberg/parquet/Parquet.java | 8 ++++++++ .../iceberg/parquet/ParquetSchemaUtil.java | 19 +++++++++++++++++++ .../iceberg/parquet/TypeToMessageType.java | 2 +- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 8bd9dba9d3a2..6044babdca62 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -194,6 +194,14 @@ public WriteBuilder schema(Schema newSchema) { return this; } + /** + * Set a {@link BiFunction} that is called with each variant field's name and field ID to + * produce the shredding type as a {@code typed_value} field. This field is added to the result + * variant struct alongside the {@code metadata} and {@code value} fields. + * + * @param func {@link BiFunction} that produces a shredded {@code typed_value} + * @return this for method chaining + */ public WriteBuilder variantShreddingFunc(BiFunction func) { this.variantShreddingFunc = func; return this; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java index 31a137cf85c7..2f3f21ef3bdf 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java @@ -39,10 +39,29 @@ public class ParquetSchemaUtil { private ParquetSchemaUtil() {} + /** + * Convert an Iceberg schema to Parquet. + * + * @param schema an Iceberg {@link Schema} + * @param name name for the Parquet schema + * @return the schema converted to a Parquet {@link MessageType} + */ public static MessageType convert(Schema schema, String name) { return new TypeToMessageType().convert(schema, name); } + /** + * Convert an Iceberg schema to Parquet. + * + *

Variant fields are converted by calling the function with the variant's name and field ID to + * produce the shredding type as a {@code typed_value} field. This field is added to the variant + * struct alongside the {@code metadata} and {@code value} fields. + * + * @param schema an Iceberg {@link Schema} + * @param name name for the Parquet schema + * @param variantShreddingFunc {@link BiFunction} that produces a shredded {@code typed_value} + * @return the schema converted to a Parquet {@link MessageType} + */ public static MessageType convert( Schema schema, String name, BiFunction variantShreddingFunc) { return new TypeToMessageType(variantShreddingFunc).convert(schema, name); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index 1a65c26c0f54..44e407f72a75 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -68,7 +68,7 @@ public TypeToMessageType() { this.variantShreddingFunc = null; } - public TypeToMessageType(BiFunction variantShreddingFunc) { + TypeToMessageType(BiFunction variantShreddingFunc) { this.variantShreddingFunc = variantShreddingFunc; } From 4bcf3883a7efcd9caa4c50793118b3964c8f1e6e Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 4 Mar 2025 19:15:07 -0800 Subject: [PATCH 6/7] Pull out VariantShreddingFunction interface. --- .../org/apache/iceberg/parquet/Parquet.java | 13 +++--- .../iceberg/parquet/ParquetSchemaUtil.java | 12 +++--- .../parquet/VariantShreddingFunction.java | 40 +++++++++++++++++++ .../iceberg/parquet/TestVariantWriters.java | 5 +-- 4 files changed, 54 insertions(+), 16 deletions(-) create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6044babdca62..1d2f7b64e840 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -120,7 +120,6 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,7 +160,7 @@ public static class WriteBuilder implements InternalData.WriteBuilder { private final Map metadata = Maps.newLinkedHashMap(); private final Map config = Maps.newLinkedHashMap(); private Schema schema = null; - private BiFunction variantShreddingFunc = null; + private VariantShreddingFunction variantShreddingFunc = null; private String name = "table"; private WriteSupport writeSupport = null; private BiFunction> createWriterFunc = null; @@ -195,14 +194,14 @@ public WriteBuilder schema(Schema newSchema) { } /** - * Set a {@link BiFunction} that is called with each variant field's name and field ID to - * produce the shredding type as a {@code typed_value} field. This field is added to the result - * variant struct alongside the {@code metadata} and {@code value} fields. + * Set a {@link VariantShreddingFunction} that is called with each variant field's name and + * field ID to produce the shredding type as a {@code typed_value} field. This field is added to + * the result variant struct alongside the {@code metadata} and {@code value} fields. * - * @param func {@link BiFunction} that produces a shredded {@code typed_value} + * @param func {@link VariantShreddingFunction} that produces a shredded {@code typed_value} * @return this for method chaining */ - public WriteBuilder variantShreddingFunc(BiFunction func) { + public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) { this.variantShreddingFunc = func; return this; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java index 2f3f21ef3bdf..f4760738df68 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.mapping.NameMapping; @@ -53,17 +52,18 @@ public static MessageType convert(Schema schema, String name) { /** * Convert an Iceberg schema to Parquet. * - *

Variant fields are converted by calling the function with the variant's name and field ID to - * produce the shredding type as a {@code typed_value} field. This field is added to the variant - * struct alongside the {@code metadata} and {@code value} fields. + *

Variant fields are converted by calling the {@link VariantShreddingFunction} with the + * variant's and field ID and name to produce the shredding type as a {@code typed_value} field. + * This field is added to the variant struct alongside the {@code metadata} and {@code value} + * fields. * * @param schema an Iceberg {@link Schema} * @param name name for the Parquet schema - * @param variantShreddingFunc {@link BiFunction} that produces a shredded {@code typed_value} + * @param variantShreddingFunc {@link VariantShreddingFunction} that produces a shredded type * @return the schema converted to a Parquet {@link MessageType} */ public static MessageType convert( - Schema schema, String name, BiFunction variantShreddingFunc) { + Schema schema, String name, VariantShreddingFunction variantShreddingFunc) { return new TypeToMessageType(variantShreddingFunc).convert(schema, name); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java new file mode 100644 index 000000000000..9ea4c083deb7 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java @@ -0,0 +1,40 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg.parquet; + +import java.util.function.BiFunction; +import org.apache.parquet.schema.Type; + +public interface VariantShreddingFunction extends BiFunction { + /** + * A function to produce the shredded type for a variant field. This function is called with the + * ID and name of a variant field to produce the shredded type as a {@code typed_value} field. + * This field is added to the result variant struct alongside the {@code metadata} and {@code + * value} fields. + * + * @param fieldId field ID of the variant field to shred + * @param name name of the variant field to shred + * @return a Parquet {@link Type} to use as the Variant's {@code typed_value} field + */ + @Override + Type apply(Integer fieldId, String name); +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java index b3bada2ca237..7b08cd8e50e4 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java @@ -24,7 +24,6 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.List; -import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.InternalTestHelpers; @@ -171,13 +170,13 @@ public void testMixedShredding(Variant variant) throws IOException { } } - private static Record writeAndRead(BiFunction shreddingFunc, Record record) + private static Record writeAndRead(VariantShreddingFunction shreddingFunc, Record record) throws IOException { return Iterables.getOnlyElement(writeAndRead(shreddingFunc, List.of(record))); } private static List writeAndRead( - BiFunction shreddingFunc, List records) throws IOException { + VariantShreddingFunction shreddingFunc, List records) throws IOException { OutputFile outputFile = new InMemoryOutputFile(); try (FileAppender writer = From 36a224a732d5dcf74b5546102c7afb2c2d79ea45 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 5 Mar 2025 12:32:10 -0800 Subject: [PATCH 7/7] Apply spotless. --- .../parquet/VariantShreddingFunction.java | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java index 9ea4c083deb7..1b3201195004 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java @@ -1,24 +1,21 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.iceberg.parquet; import java.util.function.BiFunction;