diff --git a/api/src/main/java/org/apache/iceberg/variants/Variant.java b/api/src/main/java/org/apache/iceberg/variants/Variant.java index 4376938e9e15..4ea462f7256d 100644 --- a/api/src/main/java/org/apache/iceberg/variants/Variant.java +++ b/api/src/main/java/org/apache/iceberg/variants/Variant.java @@ -29,17 +29,7 @@ public interface Variant { VariantValue value(); static Variant of(VariantMetadata metadata, VariantValue value) { - return new Variant() { - @Override - public VariantMetadata metadata() { - return metadata; - } - - @Override - public VariantValue value() { - return value; - } - }; + return new VariantData(metadata, value); } static Variant from(ByteBuffer buffer) { diff --git a/api/src/main/java/org/apache/iceberg/variants/VariantData.java b/api/src/main/java/org/apache/iceberg/variants/VariantData.java new file mode 100644 index 000000000000..20d8464c5163 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/variants/VariantData.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class VariantData implements Variant { + private final VariantMetadata metadata; + private final VariantValue value; + + VariantData(VariantMetadata metadata, VariantValue value) { + Preconditions.checkArgument(metadata != null, "Invalid variant metadata: null"); + Preconditions.checkArgument(value != null, "Invalid variant value: null"); + this.metadata = metadata; + this.value = value; + } + + @Override + public VariantMetadata metadata() { + return metadata; + } + + @Override + public VariantValue value() { + return value; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java index 1a22a652b0d9..809de747db7f 100644 --- a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java +++ b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java @@ -46,11 +46,11 @@ public class ShreddedObject implements VariantObject { private SerializationState serializationState = null; ShreddedObject(VariantMetadata metadata) { - this.metadata = metadata; - this.unshredded = null; + this(metadata, null); } ShreddedObject(VariantMetadata metadata, VariantObject unshredded) { + Preconditions.checkArgument(metadata != null, "Invalid metadata: null"); this.metadata = metadata; this.unshredded = unshredded; } diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java b/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java new file mode 100644 index 000000000000..3bee300f20a9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class VariantVisitor { + public R object(VariantObject object, List fieldNames, List fieldResults) { + return null; + } + + public R array(VariantArray array, List elementResults) { + return null; + } + + public R primitive(VariantPrimitive primitive) { + return null; + } + + public void beforeArrayElement(int index) {} + + public void afterArrayElement(int index) {} + + public void beforeObjectField(String fieldName) {} + + public void afterObjectField(String fieldName) {} + + public static R visit(Variant variant, VariantVisitor visitor) { + return visit(variant.value(), visitor); + } + + public static R visit(VariantValue value, VariantVisitor visitor) { + switch (value.type()) { + case ARRAY: + VariantArray array = value.asArray(); + List elementResults = Lists.newArrayList(); + for (int index = 0; index < array.numElements(); index += 1) { + visitor.beforeArrayElement(index); + try { + elementResults.add(visit(array.get(index), visitor)); + } finally { + visitor.afterArrayElement(index); + } + } + + return visitor.array(array, elementResults); + + case OBJECT: + VariantObject object = value.asObject(); + List fieldNames = Lists.newArrayList(); + List fieldResults = Lists.newArrayList(); + for (String fieldName : object.fieldNames()) { + fieldNames.add(fieldName); + visitor.beforeObjectField(fieldName); + try { + fieldResults.add(visit(object.get(fieldName), visitor)); + } finally { + visitor.afterObjectField(fieldName); + } + } + + return visitor.object(object, fieldNames, fieldResults); + + default: + return visitor.primitive(value.asPrimitive()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java index de965b6f9044..886deeb8fb31 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variants.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -45,6 +45,16 @@ public static ShreddedObject object(VariantMetadata metadata) { return new ShreddedObject(metadata); } + public static ShreddedObject object(VariantObject object) { + if (object instanceof ShreddedObject) { + return new ShreddedObject(((ShreddedObject) object).metadata(), object); + } else if (object instanceof SerializedObject) { + return new ShreddedObject(((SerializedObject) object).metadata(), object); + } + + throw new UnsupportedOperationException("Metadata is required for object: " + object); + } + public static VariantPrimitive of(PhysicalType type, T value) { return new PrimitiveWrapper<>(type, value); } diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java index 3894298b7fc4..781051f11d7b 100644 --- a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -25,6 +25,8 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantTestUtil; public class InternalTestHelpers { @@ -106,6 +108,12 @@ private static void assertEquals(Type type, Object expected, Object actual) { assertThat(actual).as("Actual should be a Map").isInstanceOf(Map.class); assertEquals(type.asMapType(), (Map) expected, (Map) actual); break; + case VARIANT: + assertThat(expected).as("Expected should be a Variant").isInstanceOf(Variant.class); + assertThat(actual).as("Actual should be a Variant").isInstanceOf(Variant.class); + VariantTestUtil.assertEqual(((Variant) expected).metadata(), ((Variant) actual).metadata()); + VariantTestUtil.assertEqual(((Variant) expected).value(), ((Variant) actual).value()); + break; default: throw new IllegalArgumentException("Not a supported type: " + type); } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 13ae65d10ab9..92aab005579c 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetVariantVisitor; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -34,9 +38,14 @@ abstract class BaseParquetWriter { - @SuppressWarnings("unchecked") protected ParquetValueWriter createWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return createWriter(null, type); + } + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(Types.StructType struct, MessageType type) { + return (ParquetValueWriter) + TypeWithSchemaVisitor.visit(struct, type, new WriteBuilder(type)); } protected abstract ParquetValueWriters.StructWriter createStructWriter( @@ -62,7 +71,7 @@ protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean i } } - private class WriteBuilder extends ParquetTypeVisitor> { + private class WriteBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private WriteBuilder(MessageType type) { @@ -71,14 +80,14 @@ private WriteBuilder(MessageType type) { @Override public ParquetValueWriter message( - MessageType message, List> fieldWriters) { + Types.StructType struct, MessageType message, List> fieldWriters) { - return struct(message.asGroupType(), fieldWriters); + return struct(struct, message.asGroupType(), fieldWriters); } @Override public ParquetValueWriter struct( - GroupType struct, List> fieldWriters) { + Types.StructType iceberg, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); for (int i = 0; i < fields.size(); i += 1) { @@ -91,7 +100,8 @@ public ParquetValueWriter struct( } @Override - public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list( + Types.ListType iceberg, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -107,7 +117,10 @@ public ParquetValueWriter list(GroupType array, ParquetValueWriter element @Override public ParquetValueWriter map( - GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { + Types.MapType iceberg, + GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -127,7 +140,8 @@ public ParquetValueWriter map( } @Override - public ParquetValueWriter primitive(PrimitiveType primitive) { + public ParquetValueWriter primitive( + org.apache.iceberg.types.Type.PrimitiveType iceberg, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { @@ -157,6 +171,16 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + + @Override + public ParquetValueWriter variant(Types.VariantType iVariant, ParquetValueWriter result) { + return result; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantWriterBuilder(type, Arrays.asList(currentPath())); + } } private class LogicalTypeWriterVisitor diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java index b42f07ce18ce..79e4989efbeb 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java @@ -24,6 +24,7 @@ import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; @@ -38,9 +39,14 @@ public class InternalWriter extends BaseParquetWriter { private InternalWriter() {} - @SuppressWarnings("unchecked") public static ParquetValueWriter create(MessageType type) { - return (ParquetValueWriter) INSTANCE.createWriter(type); + return create(null, type); + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter create( + Types.StructType struct, MessageType type) { + return (ParquetValueWriter) INSTANCE.createWriter(struct, type); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index ae25305629ef..1d2f7b64e840 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -160,9 +160,10 @@ public static class WriteBuilder implements InternalData.WriteBuilder { private final Map metadata = Maps.newLinkedHashMap(); private final Map config = Maps.newLinkedHashMap(); private Schema schema = null; + private VariantShreddingFunction variantShreddingFunc = null; private String name = "table"; private WriteSupport writeSupport = null; - private Function> createWriterFunc = null; + private BiFunction> createWriterFunc = null; private MetricsConfig metricsConfig = MetricsConfig.getDefault(); private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE; private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; @@ -192,6 +193,19 @@ public WriteBuilder schema(Schema newSchema) { return this; } + /** + * Set a {@link VariantShreddingFunction} that is called with each variant field's name and + * field ID to produce the shredding type as a {@code typed_value} field. This field is added to + * the result variant struct alongside the {@code metadata} and {@code value} fields. + * + * @param func {@link VariantShreddingFunction} that produces a shredded {@code typed_value} + * @return this for method chaining + */ + public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) { + this.variantShreddingFunc = func; + return this; + } + @Override public WriteBuilder named(String newName) { this.name = newName; @@ -222,7 +236,9 @@ public WriteBuilder meta(String property, String value) { public WriteBuilder createWriterFunc( Function> newCreateWriterFunc) { - this.createWriterFunc = newCreateWriterFunc; + if (newCreateWriterFunc != null) { + this.createWriterFunc = (icebergSchema, type) -> newCreateWriterFunc.apply(type); + } return this; } @@ -292,6 +308,7 @@ private void setBloomFilterConfig( Map fieldIdToParquetPath = parquetSchema.getColumns().stream() + .filter(col -> col.getPrimitiveType().getId() != null) .collect( Collectors.toMap( col -> col.getPrimitiveType().getId().intValue(), @@ -362,7 +379,7 @@ public FileAppender build() throws IOException { } set("parquet.avro.write-old-list-structure", "false"); - MessageType type = ParquetSchemaUtil.convert(schema, name); + MessageType type = ParquetSchemaUtil.convert(schema, name, variantShreddingFunc); FileEncryptionProperties fileEncryptionProperties = null; if (fileEncryptionKey != null) { @@ -406,6 +423,7 @@ public FileAppender build() throws IOException { conf, file, schema, + type, rowGroupSize, metadata, createWriterFunc, diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java index 68a9aa979fdf..f4760738df68 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java @@ -38,10 +38,35 @@ public class ParquetSchemaUtil { private ParquetSchemaUtil() {} + /** + * Convert an Iceberg schema to Parquet. + * + * @param schema an Iceberg {@link Schema} + * @param name name for the Parquet schema + * @return the schema converted to a Parquet {@link MessageType} + */ public static MessageType convert(Schema schema, String name) { return new TypeToMessageType().convert(schema, name); } + /** + * Convert an Iceberg schema to Parquet. + * + *

Variant fields are converted by calling the {@link VariantShreddingFunction} with the + * variant's and field ID and name to produce the shredding type as a {@code typed_value} field. + * This field is added to the variant struct alongside the {@code metadata} and {@code value} + * fields. + * + * @param schema an Iceberg {@link Schema} + * @param name name for the Parquet schema + * @param variantShreddingFunc {@link VariantShreddingFunction} that produces a shredded type + * @return the schema converted to a Parquet {@link MessageType} + */ + public static MessageType convert( + Schema schema, String name, VariantShreddingFunction variantShreddingFunc) { + return new TypeToMessageType(variantShreddingFunc).convert(schema, name); + } + /** * Converts a Parquet schema to an Iceberg schema. Fields without IDs are kept and assigned * fallback IDs. diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 1a7ebe0767d8..f6b729f9f25f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -71,6 +71,10 @@ public static UnboxedWriter shorts(ColumnDescriptor desc) { return new ShortWriter(desc); } + public static ParquetValueWriter unboxed(ColumnDescriptor desc) { + return new UnboxedWriter<>(desc); + } + public static UnboxedWriter ints(ColumnDescriptor desc) { return new UnboxedWriter<>(desc); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index b0e132afad5f..3e5635958c0a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -351,31 +351,7 @@ private VariantReader( public Variant read(Variant ignored) { VariantMetadata metadata = metadataReader.read(null); VariantValue value = valueReader.read(metadata); - if (value == MISSING) { - return new Variant() { - @Override - public VariantMetadata metadata() { - return metadata; - } - - @Override - public VariantValue value() { - return Variants.ofNull(); - } - }; - } - - return new Variant() { - @Override - public VariantMetadata metadata() { - return metadata; - } - - @Override - public VariantValue value() { - return value; - } - }; + return Variant.of(metadata, value != MISSING ? value : Variants.ofNull()); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java new file mode 100644 index 000000000000..330b349b3003 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.column.ColumnWriteStore; + +class ParquetVariantWriters { + private ParquetVariantWriters() {} + + @SuppressWarnings("unchecked") + static ParquetValueWriter variant( + ParquetValueWriter metadataWriter, ParquetValueWriter valueWriter) { + return new VariantWriter( + (ParquetValueWriter) metadataWriter, + (ParquetValueWriter) valueWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter metadata(ParquetValueWriter bytesWriter) { + return new VariantMetadataWriter((ParquetValueWriter) bytesWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter value(ParquetValueWriter bytesWriter) { + return new VariantValueWriter((ParquetValueWriter) bytesWriter); + } + + static ParquetValueWriter primitive( + ParquetValueWriter writer, PhysicalType... types) { + return new PrimitiveWriter<>(writer, Sets.immutableEnumSet(Arrays.asList(types))); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter shredded( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + ParquetValueWriter typedWriter) { + return new ShreddedVariantWriter( + valueDefinitionLevel, + (ParquetValueWriter) valueWriter, + typedDefinitionLevel, + (TypedWriter) typedWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter objects( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + int fieldDefinitionLevel, + List fieldNames, + List> fieldWriters) { + ImmutableMap.Builder> builder = ImmutableMap.builder(); + for (int i = 0; i < fieldNames.size(); i += 1) { + builder.put(fieldNames.get(i), (ParquetValueWriter) fieldWriters.get(i)); + } + + return new ShreddedObjectWriter( + valueDefinitionLevel, + (ParquetValueWriter) valueWriter, + typedDefinitionLevel, + fieldDefinitionLevel, + builder.build()); + } + + private static class VariantWriter implements ParquetValueWriter { + private final ParquetValueWriter metadataWriter; + private final ParquetValueWriter valueWriter; + private final List> children; + + private VariantWriter( + ParquetValueWriter metadataWriter, + ParquetValueWriter valueWriter) { + this.metadataWriter = metadataWriter; + this.valueWriter = valueWriter; + this.children = children(metadataWriter, valueWriter); + } + + @Override + public void write(int repetitionLevel, Variant variant) { + metadataWriter.write(repetitionLevel, variant.metadata()); + valueWriter.write(repetitionLevel, variant.value()); + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + metadataWriter.setColumnStore(columnStore); + valueWriter.setColumnStore(columnStore); + } + } + + private abstract static class VariantBinaryWriter implements ParquetValueWriter { + private final ParquetValueWriter bytesWriter; + private ByteBuffer reusedBuffer = ByteBuffer.allocate(2048).order(ByteOrder.LITTLE_ENDIAN); + + private VariantBinaryWriter(ParquetValueWriter bytesWriter) { + this.bytesWriter = bytesWriter; + } + + protected abstract int sizeInBytes(T value); + + protected abstract int writeTo(ByteBuffer buffer, int offset, T value); + + @Override + public void write(int repetitionLevel, T value) { + bytesWriter.write(repetitionLevel, serialize(value)); + } + + @Override + public List> columns() { + return bytesWriter.columns(); + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + bytesWriter.setColumnStore(columnStore); + } + + private void ensureCapacity(int requiredSize) { + if (reusedBuffer.capacity() < requiredSize) { + int newCapacity = capacityFor(requiredSize); + this.reusedBuffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN); + } else { + reusedBuffer.limit(requiredSize); + } + } + + private ByteBuffer serialize(T value) { + ensureCapacity(sizeInBytes(value)); + int size = writeTo(reusedBuffer, 0, value); + reusedBuffer.position(0); + reusedBuffer.limit(size); + return reusedBuffer; + } + } + + private static class VariantMetadataWriter extends VariantBinaryWriter { + private VariantMetadataWriter(ParquetValueWriter bytesWriter) { + super(bytesWriter); + } + + @Override + protected int sizeInBytes(VariantMetadata metadata) { + return metadata.sizeInBytes(); + } + + @Override + protected int writeTo(ByteBuffer buffer, int offset, VariantMetadata metadata) { + return metadata.writeTo(buffer, offset); + } + } + + private static class VariantValueWriter extends VariantBinaryWriter { + private VariantValueWriter(ParquetValueWriter bytesWriter) { + super(bytesWriter); + } + + @Override + protected int sizeInBytes(VariantValue value) { + return value.sizeInBytes(); + } + + @Override + protected int writeTo(ByteBuffer buffer, int offset, VariantValue value) { + return value.writeTo(buffer, offset); + } + } + + private interface TypedWriter extends ParquetValueWriter { + Set types(); + } + + private static class PrimitiveWriter implements TypedWriter { + private final Set types; + private final ParquetValueWriter writer; + + private PrimitiveWriter(ParquetValueWriter writer, Set types) { + this.types = types; + this.writer = writer; + } + + @Override + public Set types() { + return types; + } + + @Override + @SuppressWarnings("unchecked") + public void write(int repetitionLevel, VariantValue value) { + writer.write(repetitionLevel, (T) value.asPrimitive().get()); + } + + @Override + public List> columns() { + return writer.columns(); + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + writer.setColumnStore(columnStore); + } + } + + private static class ShreddedVariantWriter implements ParquetValueWriter { + private final int valueDefinitionLevel; + private final ParquetValueWriter valueWriter; + private final int typedDefinitionLevel; + private final TypedWriter typedWriter; + private final List> children; + + private ShreddedVariantWriter( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + TypedWriter typedWriter) { + this.valueDefinitionLevel = valueDefinitionLevel; + this.valueWriter = valueWriter; + this.typedDefinitionLevel = typedDefinitionLevel; + this.typedWriter = typedWriter; + this.children = children(valueWriter, typedWriter); + } + + @Override + public void write(int repetitionLevel, VariantValue value) { + if (typedWriter.types().contains(value.type())) { + typedWriter.write(repetitionLevel, value); + writeNull(valueWriter, repetitionLevel, valueDefinitionLevel); + } else { + valueWriter.write(repetitionLevel, value); + writeNull(typedWriter, repetitionLevel, typedDefinitionLevel); + } + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + valueWriter.setColumnStore(columnStore); + typedWriter.setColumnStore(columnStore); + } + } + + private static class ShreddedObjectWriter implements ParquetValueWriter { + private final int valueDefinitionLevel; + private final ParquetValueWriter valueWriter; + private final int typedDefinitionLevel; + private final int fieldDefinitionLevel; + private final Map> typedWriters; + private final List> children; + + private ShreddedObjectWriter( + int valueDefinitionLevel, + ParquetValueWriter valueWriter, + int typedDefinitionLevel, + int fieldDefinitionLevel, + Map> typedWriters) { + this.valueDefinitionLevel = valueDefinitionLevel; + this.valueWriter = valueWriter; + this.typedDefinitionLevel = typedDefinitionLevel; + this.fieldDefinitionLevel = fieldDefinitionLevel; + this.typedWriters = typedWriters; + this.children = + children( + Iterables.concat( + ImmutableList.of(valueWriter), ImmutableList.copyOf(typedWriters.values()))); + } + + @Override + public void write(int repetitionLevel, VariantValue value) { + if (value.type() != PhysicalType.OBJECT) { + valueWriter.write(repetitionLevel, value); + + // write null for the typed_value group + for (ParquetValueWriter writer : typedWriters.values()) { + writeNull(writer, repetitionLevel, typedDefinitionLevel); + } + + } else { + VariantObject object = value.asObject(); + ShreddedObject shredded = Variants.object(object); + for (Map.Entry> entry : typedWriters.entrySet()) { + String fieldName = entry.getKey(); + ParquetValueWriter writer = entry.getValue(); + + VariantValue fieldValue = object.get(fieldName); + if (fieldValue != null) { + // shredded: suppress the field in the object and write it to the value pair + shredded.remove(fieldName); + writer.write(repetitionLevel, fieldValue); + } else { + // missing: write null to both value and typed_value + writeNull(writer, repetitionLevel, fieldDefinitionLevel); + } + } + + if (shredded.numFields() > 0) { + // partially shredded: write the unshredded fields + valueWriter.write(repetitionLevel, shredded); + } else { + // completely shredded: omit the empty value + writeNull(valueWriter, repetitionLevel, valueDefinitionLevel); + } + } + } + + @Override + public List> columns() { + return children; + } + + @Override + public void setColumnStore(ColumnWriteStore columnStore) { + valueWriter.setColumnStore(columnStore); + for (ParquetValueWriter fieldWriter : typedWriters.values()) { + fieldWriter.setColumnStore(columnStore); + } + } + } + + private static void writeNull( + ParquetValueWriter writer, int repetitionLevel, int definitionLevel) { + for (TripleWriter column : writer.columns()) { + column.writeNull(repetitionLevel, definitionLevel - 1); + } + } + + private static List> children(ParquetValueWriter... writers) { + return children(Arrays.asList(writers)); + } + + private static List> children(Iterable> writers) { + return ImmutableList.copyOf( + Iterables.concat(Iterables.transform(writers, ParquetValueWriter::columns))); + } + + private static final double LN2 = Math.log(2); + + private static int capacityFor(int valueSize) { + // find the power of 2 size that fits the value + int nextPow2 = (int) Math.ceil(Math.log(valueSize) / LN2); + // return a capacity that is 2 the next power of 2 size up to the max + return Math.min(1 << (nextPow2 + 1), Integer.MAX_VALUE - 1); + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 099cffc33bb8..b808734744a9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -23,7 +23,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; -import java.util.function.Function; +import java.util.function.BiFunction; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; @@ -75,9 +75,10 @@ class ParquetWriter implements FileAppender, Closeable { Configuration conf, OutputFile output, Schema schema, + MessageType parquetSchema, long rowGroupSize, Map metadata, - Function> createWriterFunc, + BiFunction> createWriterFunc, CompressionCodecName codec, ParquetProperties properties, MetricsConfig metricsConfig, @@ -88,8 +89,8 @@ class ParquetWriter implements FileAppender, Closeable { this.metadata = ImmutableMap.copyOf(metadata); this.compressor = new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); - this.parquetSchema = ParquetSchemaUtil.convert(schema, "table"); - this.model = (ParquetValueWriter) createWriterFunc.apply(parquetSchema); + this.parquetSchema = parquetSchema; + this.model = (ParquetValueWriter) createWriterFunc.apply(schema, parquetSchema); this.metricsConfig = metricsConfig; this.columnIndexTruncateLength = conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index cb40693e24a4..44e407f72a75 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -26,8 +26,10 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import java.util.function.BiFunction; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type.NestedType; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; @@ -56,6 +58,19 @@ public class TypeToMessageType { LogicalTypeAnnotation.timestampType(false /* not adjusted to UTC */, TimeUnit.MICROS); private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = LogicalTypeAnnotation.timestampType(true /* adjusted to UTC */, TimeUnit.MICROS); + private static final String METADATA = "metadata"; + private static final String VALUE = "value"; + private static final String TYPED_VALUE = "typed_value"; + + private final BiFunction variantShreddingFunc; + + public TypeToMessageType() { + this.variantShreddingFunc = null; + } + + TypeToMessageType(BiFunction variantShreddingFunc) { + this.variantShreddingFunc = variantShreddingFunc; + } public MessageType convert(Schema schema, String name) { Types.MessageTypeBuilder builder = Types.buildMessage(); @@ -122,13 +137,41 @@ public GroupType map(MapType map, Type.Repetition repetition, int id, String nam public Type variant(Type.Repetition repetition, int id, String originalName) { String name = AvroSchemaUtil.makeCompatibleName(originalName); - return Types.buildGroup(repetition) - .id(id) - .required(BINARY) - .named("metadata") - .required(BINARY) - .named("value") - .named(name); + Type shreddedType; + if (variantShreddingFunc != null) { + shreddedType = variantShreddingFunc.apply(id, originalName); + } else { + shreddedType = null; + } + + if (shreddedType != null) { + Preconditions.checkArgument( + shreddedType.getName().equals(TYPED_VALUE), + "Invalid shredded type name: %s should be typed_value", + shreddedType.getName()); + Preconditions.checkArgument( + shreddedType.isRepetition(Type.Repetition.OPTIONAL), + "Invalid shredded type repetition: %s should be OPTIONAL", + shreddedType.getRepetition()); + + return Types.buildGroup(repetition) + .id(id) + .required(BINARY) + .named(METADATA) + .optional(BINARY) + .named(VALUE) + .addField(shreddedType) + .named(name); + + } else { + return Types.buildGroup(repetition) + .id(id) + .required(BINARY) + .named(METADATA) + .required(BINARY) + .named(VALUE) + .named(name); + } } public Type primitive( diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java new file mode 100644 index 000000000000..1b3201195004 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantShreddingFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.function.BiFunction; +import org.apache.parquet.schema.Type; + +public interface VariantShreddingFunction extends BiFunction { + /** + * A function to produce the shredded type for a variant field. This function is called with the + * ID and name of a variant field to produce the shredded type as a {@code typed_value} field. + * This field is added to the result variant struct alongside the {@code metadata} and {@code + * value} fields. + * + * @param fieldId field ID of the variant field to shred + * @param name name of the variant field to shred + * @return a Parquet {@link Type} to use as the Variant's {@code typed_value} field + */ + @Override + Type apply(Integer fieldId, String name); +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java new file mode 100644 index 000000000000..afe2907a4091 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.Deque; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.VariantValue; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public class VariantWriterBuilder extends ParquetVariantVisitor> { + private final MessageType schema; + private final Iterable basePath; + private final Deque fieldNames = Lists.newLinkedList(); + + public VariantWriterBuilder(MessageType schema, Iterable basePath) { + this.schema = schema; + this.basePath = basePath; + } + + @Override + public void beforeField(Type type) { + fieldNames.addLast(type.getName()); + } + + @Override + public void afterField(Type type) { + fieldNames.removeLast(); + } + + private String[] currentPath() { + return Streams.concat(Streams.stream(basePath), fieldNames.stream()).toArray(String[]::new); + } + + private String[] path(String... names) { + return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(names)) + .toArray(String[]::new); + } + + @Override + public ParquetValueWriter variant( + GroupType variant, ParquetValueWriter metadataWriter, ParquetValueWriter valueWriter) { + return ParquetVariantWriters.variant(metadataWriter, valueWriter); + } + + @Override + public ParquetValueWriter metadata(PrimitiveType metadata) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + return ParquetVariantWriters.metadata(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public ParquetValueWriter serialized(PrimitiveType value) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + return ParquetVariantWriters.value(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public ParquetValueWriter primitive(PrimitiveType primitive) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); + if (annotation != null) { + Optional> writer = + annotation.accept(new LogicalTypeToVariantWriter(desc)); + if (writer.isPresent()) { + return writer.get(); + } + + } else { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + return ParquetVariantWriters.primitive( + ParquetValueWriters.byteBuffers(desc), PhysicalType.BINARY); + case BOOLEAN: + return ParquetVariantWriters.primitive( + ParquetValueWriters.booleans(desc), + PhysicalType.BOOLEAN_TRUE, + PhysicalType.BOOLEAN_FALSE); + case INT32: + return ParquetVariantWriters.primitive( + ParquetValueWriters.ints(desc), PhysicalType.INT32); + case INT64: + return ParquetVariantWriters.primitive( + ParquetValueWriters.longs(desc), PhysicalType.INT64); + case FLOAT: + // use an unboxed writer to skip metrics collection that requires an ID + return ParquetVariantWriters.primitive( + ParquetValueWriters.unboxed(desc), PhysicalType.FLOAT); + case DOUBLE: + // use an unboxed writer to skip metrics collection that requires an ID + return ParquetVariantWriters.primitive( + ParquetValueWriters.unboxed(desc), PhysicalType.DOUBLE); + } + } + + throw new UnsupportedOperationException("Unsupported shredded value type: " + primitive); + } + + @Override + public ParquetValueWriter value( + GroupType value, ParquetValueWriter valueWriter, ParquetValueWriter typedWriter) { + int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); + if (typedWriter != null) { + int typedValueDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)); + return ParquetVariantWriters.shredded(valueDL, valueWriter, typedValueDL, typedWriter); + } else if (value.getType(VALUE).isRepetition(Type.Repetition.OPTIONAL)) { + return ParquetValueWriters.option(value.getType(VALUE), valueDL, valueWriter); + } else { + return valueWriter; + } + } + + @Override + public ParquetValueWriter object( + GroupType object, + ParquetValueWriter valueWriter, + List> fieldWriters) { + int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); + int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)); + GroupType firstField = object.getType(TYPED_VALUE).asGroupType().getType(0).asGroupType(); + int fieldDL = + schema.getMaxDefinitionLevel( + path(TYPED_VALUE, firstField.getName(), firstField.getType(0).getName())); + + List names = + object.getType(TYPED_VALUE).asGroupType().getFields().stream() + .map(Type::getName) + .collect(Collectors.toList()); + + return ParquetVariantWriters.objects( + valueDL, valueWriter, typedDL, fieldDL, names, fieldWriters); + } + + @Override + public ParquetValueWriter array( + GroupType array, ParquetValueWriter valueWriter, ParquetValueWriter elementWriter) { + throw new UnsupportedOperationException("Array is not yet supported"); + } + + private static class LogicalTypeToVariantWriter + implements LogicalTypeAnnotationVisitor> { + private final ColumnDescriptor desc; + + private LogicalTypeToVariantWriter(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit(StringLogicalTypeAnnotation ignored) { + ParquetValueWriter writer = + ParquetVariantWriters.primitive(ParquetValueWriters.strings(desc), PhysicalType.STRING); + return Optional.of(writer); + } + + @Override + public Optional> visit(DecimalLogicalTypeAnnotation decimal) { + ParquetValueWriter writer; + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.decimalAsFixed( + desc, decimal.getPrecision(), decimal.getScale()), + PhysicalType.DECIMAL16); + return Optional.of(writer); + case INT64: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.decimalAsLong( + desc, decimal.getPrecision(), decimal.getScale()), + PhysicalType.DECIMAL8); + return Optional.of(writer); + case INT32: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.decimalAsInteger( + desc, decimal.getPrecision(), decimal.getScale()), + PhysicalType.DECIMAL4); + return Optional.of(writer); + } + + throw new IllegalArgumentException( + "Invalid primitive type for decimal: " + desc.getPrimitiveType()); + } + + @Override + public Optional> visit(DateLogicalTypeAnnotation ignored) { + ParquetValueWriter writer = + ParquetVariantWriters.primitive(ParquetValueWriters.ints(desc), PhysicalType.DATE); + return Optional.of(writer); + } + + @Override + public Optional> visit(TimeLogicalTypeAnnotation time) { + // ParquetValueWriter writer = + // ParquetVariantWriters.primitive(ParquetValueWriters.longs(desc), PhysicalType.TIME); + return Optional.empty(); + } + + @Override + public Optional> visit(TimestampLogicalTypeAnnotation timestamp) { + if (timestamp.getUnit() == TimeUnit.MICROS) { + PhysicalType type = + timestamp.isAdjustedToUTC() ? PhysicalType.TIMESTAMPTZ : PhysicalType.TIMESTAMPNTZ; + ParquetValueWriter writer = + ParquetVariantWriters.primitive(ParquetValueWriters.longs(desc), type); + return Optional.of(writer); + } + + throw new IllegalArgumentException( + "Invalid unit for shredded timestamp: " + timestamp.getUnit()); + } + + @Override + public Optional> visit(IntLogicalTypeAnnotation logical) { + Preconditions.checkArgument( + logical.isSigned(), "Invalid logical type for variant, unsigned: %s", logical); + ParquetValueWriter writer; + switch (logical.getBitWidth()) { + case 8: + writer = + ParquetVariantWriters.primitive( + ParquetValueWriters.tinyints(desc), PhysicalType.INT8); + return Optional.of(writer); + case 16: + writer = + ParquetVariantWriters.primitive(ParquetValueWriters.shorts(desc), PhysicalType.INT16); + return Optional.of(writer); + case 32: + writer = + ParquetVariantWriters.primitive(ParquetValueWriters.ints(desc), PhysicalType.INT32); + return Optional.of(writer); + case 64: + writer = + ParquetVariantWriters.primitive(ParquetValueWriters.longs(desc), PhysicalType.INT64); + return Optional.of(writer); + } + + throw new IllegalArgumentException("Invalid bit width for int: " + logical.getBitWidth()); + } + + @Override + public Optional> visit(UUIDLogicalTypeAnnotation uuidLogicalType) { + // ParquetValueWriter writer = + // ParquetVariantWriters.primitive(ParquetValueWriters.uuids(desc), PhysicalType.UUID); + return Optional.empty(); + } + } +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java new file mode 100644 index 000000000000..7b08cd8e50e4 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantArray; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantPrimitive; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.VariantVisitor; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types.GroupBuilder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; + +public class TestVariantWriters { + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "var", Types.VariantType.get())); + + private static final GenericRecord RECORD = GenericRecord.create(SCHEMA); + + private static final ByteBuffer TEST_METADATA_BUFFER = + VariantTestUtil.createMetadata(ImmutableList.of("a", "b", "c", "d", "e"), true); + private static final ByteBuffer TEST_OBJECT_BUFFER = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, + ImmutableMap.of( + "a", Variants.ofNull(), + "d", Variants.of("iceberg"))); + private static final ByteBuffer SIMILAR_OBJECT_BUFFER = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, + ImmutableMap.of( + "a", Variants.of(123456789), + "c", Variants.of("string"))); + private static final ByteBuffer EMPTY_OBJECT_BUFFER = + VariantTestUtil.createObject(TEST_METADATA_BUFFER, ImmutableMap.of()); + + private static final VariantMetadata EMPTY_METADATA = + Variants.metadata(VariantTestUtil.emptyMetadata()); + private static final VariantMetadata TEST_METADATA = Variants.metadata(TEST_METADATA_BUFFER); + private static final VariantObject TEST_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, TEST_OBJECT_BUFFER); + private static final VariantObject SIMILAR_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, SIMILAR_OBJECT_BUFFER); + private static final VariantObject EMPTY_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, EMPTY_OBJECT_BUFFER); + + private static final Variant[] VARIANTS = + new Variant[] { + Variant.of(EMPTY_METADATA, Variants.ofNull()), + Variant.of(EMPTY_METADATA, Variants.of(true)), + Variant.of(EMPTY_METADATA, Variants.of(false)), + Variant.of(EMPTY_METADATA, Variants.of((byte) 34)), + Variant.of(EMPTY_METADATA, Variants.of((byte) -34)), + Variant.of(EMPTY_METADATA, Variants.of((short) 1234)), + Variant.of(EMPTY_METADATA, Variants.of((short) -1234)), + Variant.of(EMPTY_METADATA, Variants.of(12345)), + Variant.of(EMPTY_METADATA, Variants.of(-12345)), + Variant.of(EMPTY_METADATA, Variants.of(9876543210L)), + Variant.of(EMPTY_METADATA, Variants.of(-9876543210L)), + Variant.of(EMPTY_METADATA, Variants.of(10.11F)), + Variant.of(EMPTY_METADATA, Variants.of(-10.11F)), + Variant.of(EMPTY_METADATA, Variants.of(14.3D)), + Variant.of(EMPTY_METADATA, Variants.of(-14.3D)), + Variant.of(EMPTY_METADATA, EMPTY_OBJECT), + Variant.of(TEST_METADATA, TEST_OBJECT), + Variant.of(TEST_METADATA, SIMILAR_OBJECT), + Variant.of(EMPTY_METADATA, Variants.ofIsoDate("2024-11-07")), + Variant.of(EMPTY_METADATA, Variants.ofIsoDate("1957-11-07")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456")), + Variant.of(EMPTY_METADATA, Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456")), + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("123456.789"))), // decimal4 + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("-123456.789"))), // decimal4 + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("123456789.987654321"))), // decimal8 + Variant.of(EMPTY_METADATA, Variants.of(new BigDecimal("-123456789.987654321"))), // decimal8 + Variant.of( + EMPTY_METADATA, Variants.of(new BigDecimal("9876543210.123456789"))), // decimal16 + Variant.of( + EMPTY_METADATA, Variants.of(new BigDecimal("-9876543210.123456789"))), // decimal16 + Variant.of( + EMPTY_METADATA, Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d}))), + Variant.of(EMPTY_METADATA, Variants.of("iceberg")), + }; + + @ParameterizedTest + @FieldSource("VARIANTS") + public void testUnshreddedValues(Variant variant) throws IOException { + Record record = RECORD.copy("id", 1, "var", variant); + + Record actual = writeAndRead((id, name) -> null, record); + + InternalTestHelpers.assertEquals(SCHEMA.asStruct(), record, actual); + } + + @ParameterizedTest + @FieldSource("VARIANTS") + public void testShreddedValues(Variant variant) throws IOException { + Record record = RECORD.copy("id", 1, "var", variant); + + Record actual = writeAndRead((id, name) -> toParquetSchema(variant.value()), record); + + InternalTestHelpers.assertEquals(SCHEMA.asStruct(), record, actual); + } + + @ParameterizedTest + @FieldSource("VARIANTS") + public void testMixedShredding(Variant variant) throws IOException { + List expected = + IntStream.range(0, VARIANTS.length) + .mapToObj(i -> RECORD.copy("id", i, "var", VARIANTS[i])) + .collect(Collectors.toList()); + + List actual = writeAndRead((id, name) -> toParquetSchema(variant.value()), expected); + + assertThat(actual.size()).isEqualTo(expected.size()); + + for (int i = 0; i < expected.size(); i += 1) { + InternalTestHelpers.assertEquals(SCHEMA.asStruct(), expected.get(i), actual.get(i)); + } + } + + private static Record writeAndRead(VariantShreddingFunction shreddingFunc, Record record) + throws IOException { + return Iterables.getOnlyElement(writeAndRead(shreddingFunc, List.of(record))); + } + + private static List writeAndRead( + VariantShreddingFunction shreddingFunc, List records) throws IOException { + OutputFile outputFile = new InMemoryOutputFile(); + + try (FileAppender writer = + Parquet.write(outputFile) + .schema(SCHEMA) + .variantShreddingFunc(shreddingFunc) + .createWriterFunc(fileSchema -> InternalWriter.create(SCHEMA.asStruct(), fileSchema)) + .build()) { + for (Record record : records) { + writer.add(record); + } + } + + try (CloseableIterable reader = + Parquet.read(outputFile.toInputFile()) + .project(SCHEMA) + .createReaderFunc(fileSchema -> InternalReader.create(SCHEMA, fileSchema)) + .build()) { + return Lists.newArrayList(reader); + } + } + + private Type toParquetSchema(VariantValue value) { + return VariantVisitor.visit(value, new ParquetSchemaProducer()); + } + + private static class ParquetSchemaProducer extends VariantVisitor { + @Override + public Type object(VariantObject object, List names, List typedValues) { + if (object.numFields() < 1) { + // Parquet cannot write typed_value group with no fields + return null; + } + + List fields = Lists.newArrayList(); + int index = 0; + for (String name : names) { + Type typedValue = typedValues.get(index); + fields.add(field(name, typedValue)); + index += 1; + } + + return objectFields(fields); + } + + @Override + public Type array(VariantArray array, List elementResults) { + throw null; + } + + @Override + public Type primitive(VariantPrimitive primitive) { + switch (primitive.type()) { + case NULL: + return null; + case BOOLEAN_TRUE: + case BOOLEAN_FALSE: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.BOOLEAN); + case INT8: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(8)); + case INT16: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(16)); + case INT32: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.INT32); + case INT64: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.INT64); + case FLOAT: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.FLOAT); + case DOUBLE: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.DOUBLE); + case DECIMAL4: + BigDecimal decimal4 = (BigDecimal) primitive.get(); + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.decimalType(decimal4.scale(), 9)); + case DECIMAL8: + BigDecimal decimal8 = (BigDecimal) primitive.get(); + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.decimalType(decimal8.scale(), 18)); + case DECIMAL16: + BigDecimal decimal16 = (BigDecimal) primitive.get(); + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.decimalType(decimal16.scale(), 38)); + case DATE: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, LogicalTypeAnnotation.dateType()); + case TIMESTAMPTZ: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)); + case TIMESTAMPNTZ: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)); + case BINARY: + return shreddedPrimitive(PrimitiveType.PrimitiveTypeName.BINARY); + case STRING: + return shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType()); + } + + throw new UnsupportedOperationException("Unsupported shredding type: " + primitive.type()); + } + + private static GroupType objectFields(List fields) { + GroupBuilder builder = + org.apache.parquet.schema.Types.buildGroup(Type.Repetition.OPTIONAL); + for (GroupType field : fields) { + checkField(field); + builder.addField(field); + } + + return builder.named("typed_value"); + } + + private static void checkField(GroupType fieldType) { + Preconditions.checkArgument( + fieldType.isRepetition(Type.Repetition.REQUIRED), + "Invalid field type repetition: %s should be REQUIRED", + fieldType.getRepetition()); + } + + private static GroupType field(String name, Type shreddedType) { + GroupBuilder builder = + org.apache.parquet.schema.Types.buildGroup(Type.Repetition.REQUIRED) + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .named("value"); + + if (shreddedType != null) { + checkShreddedType(shreddedType); + builder.addField(shreddedType); + } + + return builder.named(name); + } + + private static void checkShreddedType(Type shreddedType) { + Preconditions.checkArgument( + shreddedType.getName().equals("typed_value"), + "Invalid shredded type name: %s should be typed_value", + shreddedType.getName()); + Preconditions.checkArgument( + shreddedType.isRepetition(Type.Repetition.OPTIONAL), + "Invalid shredded type repetition: %s should be OPTIONAL", + shreddedType.getRepetition()); + } + + private static Type shreddedPrimitive(PrimitiveType.PrimitiveTypeName primitive) { + return org.apache.parquet.schema.Types.optional(primitive).named("typed_value"); + } + + private static Type shreddedPrimitive( + PrimitiveType.PrimitiveTypeName primitive, LogicalTypeAnnotation annotation) { + return org.apache.parquet.schema.Types.optional(primitive) + .as(annotation) + .named("typed_value"); + } + } +}