diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index a21e3752e84d..177517973526 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -48,6 +48,7 @@ import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.junit.jupiter.api.Named; import org.junit.jupiter.params.provider.Arguments; @@ -390,6 +391,14 @@ public static CustomRow of(Object... values) { return new CustomRow(values); } + public CustomRow() { + this(new Object[0]); + } + + public CustomRow(Types.StructType structType) { + this.values = new Object[structType.fields().size()]; + } + private final Object[] values; private CustomRow(Object... values) { diff --git a/build.gradle b/build.gradle index ba15f59d5fd8..280b28354b80 100644 --- a/build.gradle +++ b/build.gradle @@ -348,6 +348,7 @@ project(':iceberg-core') { api project(':iceberg-api') implementation project(':iceberg-common') implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + testRuntimeOnly project(':iceberg-parquet') annotationProcessor libs.immutables.value compileOnly libs.immutables.value diff --git a/core/src/test/java/org/apache/iceberg/TestInternalData.java b/core/src/test/java/org/apache/iceberg/TestInternalData.java new file mode 100644 index 000000000000..ae1678e04d5f --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestInternalData.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestInternalData { + + @Parameter(index = 0) + private FileFormat format; + + @Parameters(name = " format = {0}") + protected static List parameters() { + return Arrays.asList(FileFormat.AVRO, FileFormat.PARQUET); + } + + private static final Schema SIMPLE_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + private static final Schema NESTED_SCHEMA = + new Schema( + Types.NestedField.required(1, "outer_id", Types.LongType.get()), + Types.NestedField.optional( + 2, + "nested_struct", + Types.StructType.of( + Types.NestedField.optional(3, "inner_id", Types.LongType.get()), + Types.NestedField.optional(4, "inner_name", Types.StringType.get())))); + + @TempDir private Path tempDir; + + private final FileIO fileIO = new TestTables.LocalFileIO(); + + @TestTemplate + public void testCustomRootType() throws IOException { + OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString()); + + List testData = RandomInternalData.generate(SIMPLE_SCHEMA, 1000, 1L); + + try (FileAppender appender = + InternalData.write(format, outputFile).schema(SIMPLE_SCHEMA).build()) { + appender.addAll(testData); + } + + InputFile inputFile = fileIO.newInputFile(outputFile.location()); + List readRecords = Lists.newArrayList(); + + try (CloseableIterable reader = + InternalData.read(format, inputFile) + .project(SIMPLE_SCHEMA) + .setRootType(PartitionData.class) + .build()) { + for (PartitionData record : reader) { + readRecords.add(record); + } + } + + assertThat(readRecords).hasSameSizeAs(testData); + + for (int i = 0; i < testData.size(); i++) { + Record expected = testData.get(i); + PartitionData actual = readRecords.get(i); + + assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class)); + assertThat(actual.get(1, String.class)).isEqualTo(expected.get(1, String.class)); + } + } + + @TestTemplate + public void testCustomTypeForNestedField() throws IOException { + OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString()); + + List testData = RandomInternalData.generate(NESTED_SCHEMA, 1000, 1L); + + try (FileAppender appender = + InternalData.write(format, outputFile).schema(NESTED_SCHEMA).build()) { + appender.addAll(testData); + } + + InputFile inputFile = fileIO.newInputFile(outputFile.location()); + List readRecords = Lists.newArrayList(); + + try (CloseableIterable reader = + InternalData.read(format, inputFile) + .project(NESTED_SCHEMA) + .setCustomType(2, TestHelpers.CustomRow.class) + .build()) { + for (Record record : reader) { + readRecords.add(record); + } + } + + assertThat(readRecords).hasSameSizeAs(testData); + + for (int i = 0; i < testData.size(); i++) { + Record expected = testData.get(i); + Record actual = readRecords.get(i); + + assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class)); + + Object expectedNested = expected.get(1); + Object actualNested = actual.get(1); + + if (expectedNested == null) { + // Expected nested struct is null, so actual should also be null + assertThat(actualNested).isNull(); + } else { + // Expected nested struct is not null, so actual should be a CustomRow + assertThat(actualNested).isNotNull(); + assertThat(actualNested) + .as("Custom type should be TestHelpers.CustomRow but was: " + actualNested.getClass()) + .isInstanceOf(TestHelpers.CustomRow.class); + TestHelpers.CustomRow customRow = (TestHelpers.CustomRow) actualNested; + Record expectedRecord = (Record) expectedNested; + + assertThat(customRow.get(0, Long.class)) + .isEqualTo(expectedRecord.get(0, Long.class)); // inner_id + assertThat(customRow.get(1, String.class)) + .isEqualTo(expectedRecord.get(1, String.class)); // inner_name + } + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java index 54fac0c55179..aa5b56ca5315 100644 --- a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java +++ b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java @@ -37,6 +37,6 @@ private static Parquet.WriteBuilder writeInternal(OutputFile outputFile) { } private static Parquet.ReadBuilder readInternal(InputFile inputFile) { - return Parquet.read(inputFile).createReaderFunc(InternalReader::create); + return Parquet.read(inputFile).createReaderFunc(InternalReader.readerFunction()); } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 8f2957e1c60d..b1fd8f43a578 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -53,6 +53,9 @@ import org.apache.parquet.schema.Type; abstract class BaseParquetReaders { + // Root ID is used for the top-level struct in the Parquet Schema + protected static final int ROOT_ID = -1; + protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -75,8 +78,26 @@ protected ParquetValueReader createReader( } } - protected abstract ParquetValueReader createStructReader( - List> fieldReaders, Types.StructType structType); + /** + * @deprecated will be removed in 1.12.0. Subclasses should override {@link + * #createStructReader(List, Types.StructType, Integer)} instead + */ + @Deprecated + protected ParquetValueReader createStructReader( + List> fieldReaders, Types.StructType structType) { + throw new UnsupportedOperationException( + "Deprecated method is not used in this implementation, only createStructReader(list, Types.Struct, Integer) should be used"); + } + + /** + * This method can be overridden to provide a custom implementation which also uses the fieldId of + * the Schema when creating the struct reader + */ + protected ParquetValueReader createStructReader( + List> fieldReaders, Types.StructType structType, Integer fieldId) { + // Fallback to the signature without fieldId if not overridden + return createStructReader(fieldReaders, structType); + } protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); @@ -99,8 +120,9 @@ private FallbackReadBuilder(MessageType type, Map idToConstant) { @Override public ParquetValueReader message( Types.StructType expected, MessageType message, List> fieldReaders) { - // the top level matches by ID, but the remaining IDs are missing - return super.struct(expected, message, fieldReaders); + // The top level matches by ID, but the remaining IDs are missing + // Mark the top-level struct with the ROOT_ID + return super.struct(expected, message.withId(ROOT_ID), fieldReaders); } @Override @@ -119,10 +141,15 @@ public ParquetValueReader struct( } } - return createStructReader(newFields, expected); + return createStructReader(newFields, expected, fieldId(struct)); } } + /** Returns the field ID from a Parquet GroupType, returning null if the ID is not set */ + private static Integer fieldId(GroupType struct) { + return struct.getId() != null ? struct.getId().intValue() : null; + } + private class LogicalTypeReadBuilder implements LogicalTypeAnnotationVisitor> { @@ -216,14 +243,14 @@ private ReadBuilder(MessageType type, Map idToConstant) { @Override public ParquetValueReader message( Types.StructType expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); + return struct(expected, message.asGroupType().withId(ROOT_ID), fieldReaders); } @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { if (null == expected) { - return createStructReader(ImmutableList.of(), null); + return createStructReader(ImmutableList.of(), null, fieldId(struct)); } // match the expected struct's order @@ -252,7 +279,7 @@ public ParquetValueReader struct( reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); } - return createStructReader(reorderedFields, expected); + return createStructReader(reorderedFields, expected, fieldId(struct)); } private ParquetValueReader defaultReader( diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 4af7ee381f61..2260a215534a 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -60,7 +60,7 @@ public static ParquetValueReader buildReader( @Override protected ParquetValueReader createStructReader( - List> fieldReaders, StructType structType) { + List> fieldReaders, StructType structType, Integer fieldId) { return ParquetValueReaders.recordReader(fieldReaders, structType); } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java index 03585c55c9b6..44d35ef859db 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -20,16 +20,22 @@ import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; public class InternalReader extends BaseParquetReaders { + private final Map> typesById = Maps.newHashMap(); + private static final InternalReader INSTANCE = new InternalReader<>(); private InternalReader() {} @@ -46,11 +52,48 @@ public static ParquetValueReader create( return (ParquetValueReader) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); } + public static Parquet.ReadBuilder.ReaderFunction readerFunction() { + InternalReader reader = new InternalReader<>(); + + return new Parquet.ReadBuilder.ReaderFunction() { + private Schema schema; + + @Override + public Function> apply() { + return messageType -> reader.createReader(schema, messageType); + } + + @Override + public Parquet.ReadBuilder.ReaderFunction withSchema(Schema schema) { + this.schema = schema; + return this; + } + + @Override + public Parquet.ReadBuilder.ReaderFunction withCustomTypes( + Map> customTypes) { + reader.typesById.putAll(customTypes); + return this; + } + + @Override + public Parquet.ReadBuilder.ReaderFunction withRootType(Class rootType) { + if (rootType != null) { + reader.typesById.put(ROOT_ID, rootType); + } + + return this; + } + }; + } + @Override @SuppressWarnings("unchecked") protected ParquetValueReader createStructReader( - List> fieldReaders, StructType structType) { - return (ParquetValueReader) ParquetValueReaders.recordReader(fieldReaders, structType); + List> fieldReaders, StructType structType, Integer fieldId) { + return (ParquetValueReader) + ParquetValueReaders.structLikeReader( + fieldReaders, structType, typesById.getOrDefault(fieldId, Record.class)); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6f68fbe150ff..3adb5584cc85 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1151,8 +1151,7 @@ public static class ReadBuilder implements InternalData.ReadBuilder { private Expression filter = null; private ReadSupport readSupport = null; private Function> batchedReaderFunc = null; - private Function> readerFunc = null; - private BiFunction> readerFuncWithSchema = null; + private ReaderFunction readerFunction = null; private boolean filterRecords = true; private boolean caseSensitive = true; private boolean callInit = false; @@ -1161,6 +1160,61 @@ public static class ReadBuilder implements InternalData.ReadBuilder { private NameMapping nameMapping = null; private ByteBuffer fileEncryptionKey = null; private ByteBuffer fileAADPrefix = null; + private Class rootType = null; + private Map> customTypes = Maps.newHashMap(); + + public interface ReaderFunction { + Function> apply(); + + default ReaderFunction withRootType(Class rootType) { + return this; + } + + default ReaderFunction withCustomTypes( + Map> customTypes) { + return this; + } + + default ReaderFunction withSchema(Schema schema) { + return this; + } + } + + private static class UnaryReaderFunction implements ReaderFunction { + private final Function> readerFunc; + + UnaryReaderFunction(Function> readerFunc) { + this.readerFunc = readerFunc; + } + + @Override + public Function> apply() { + return readerFunc; + } + } + + private static class BinaryReaderFunction implements ReaderFunction { + private final BiFunction> readerFuncWithSchema; + private Schema schema; + + BinaryReaderFunction( + BiFunction> readerFuncWithSchema) { + this.readerFuncWithSchema = readerFuncWithSchema; + } + + @Override + public Function> apply() { + Preconditions.checkArgument( + schema != null, "Schema must be set for 2-argument reader function"); + return messageType -> readerFuncWithSchema.apply(schema, messageType); + } + + @Override + public ReaderFunction withSchema(Schema expectedSchema) { + this.schema = expectedSchema; + return this; + } + } private ReadBuilder(InputFile file) { this.file = file; @@ -1220,35 +1274,43 @@ public ReadBuilder createReaderFunc( this.batchedReaderFunc == null, "Cannot set reader function: batched reader function already set"); Preconditions.checkArgument( - this.readerFuncWithSchema == null, - "Cannot set reader function: 2-argument reader function already set"); - this.readerFunc = newReaderFunction; + this.readerFunction == null, "Cannot set reader function: reader function already set"); + this.readerFunction = new UnaryReaderFunction(newReaderFunction); return this; } public ReadBuilder createReaderFunc( BiFunction> newReaderFunction) { - Preconditions.checkArgument( - this.readerFunc == null, - "Cannot set 2-argument reader function: reader function already set"); Preconditions.checkArgument( this.batchedReaderFunc == null, - "Cannot set 2-argument reader function: batched reader function already set"); - this.readerFuncWithSchema = newReaderFunction; + "Cannot set reader function: batched reader function already set"); + Preconditions.checkArgument( + this.readerFunction == null, "Cannot set reader function: reader function already set"); + this.readerFunction = new BinaryReaderFunction(newReaderFunction); return this; } public ReadBuilder createBatchedReaderFunc(Function> func) { Preconditions.checkArgument( - this.readerFunc == null, - "Cannot set batched reader function: reader function already set"); + this.batchedReaderFunc == null, + "Cannot set batched reader function: batched reader function already set"); Preconditions.checkArgument( - this.readerFuncWithSchema == null, - "Cannot set batched reader function: 2-argument reader function already set"); + this.readerFunction == null, + "Cannot set batched reader function: ReaderFunction already set"); this.batchedReaderFunc = func; return this; } + public ReadBuilder createReaderFunc(ReaderFunction reader) { + Preconditions.checkArgument( + this.batchedReaderFunc == null, + "Cannot set reader function: batched reader function already set"); + Preconditions.checkArgument( + this.readerFunction == null, "Cannot set reader function: reader function already set"); + this.readerFunction = reader; + return this; + } + public ReadBuilder set(String key, String value) { properties.put(key, value); return this; @@ -1281,12 +1343,14 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) { @Override public ReadBuilder setRootType(Class rootClass) { - throw new UnsupportedOperationException("Custom types are not yet supported"); + rootType = rootClass; + return this; } @Override public ReadBuilder setCustomType(int fieldId, Class structClass) { - throw new UnsupportedOperationException("Custom types are not yet supported"); + customTypes.put(fieldId, structClass); + return this; } public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { @@ -1315,7 +1379,7 @@ public CloseableIterable build() { Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); } - if (readerFunc != null || readerFuncWithSchema != null || batchedReaderFunc != null) { + if (batchedReaderFunc != null || readerFunction != null) { ParquetReadOptions.Builder optionsBuilder; if (file instanceof HadoopInputFile) { // remove read properties already set that may conflict with this read @@ -1364,9 +1428,12 @@ public CloseableIterable build() { maxRecordsPerBatch); } else { Function> readBuilder = - readerFuncWithSchema != null - ? fileType -> readerFuncWithSchema.apply(schema, fileType) - : readerFunc; + readerFunction + .withSchema(schema) + .withRootType(rootType) + .withCustomTypes(customTypes) + .apply(); + return new org.apache.iceberg.parquet.ParquetReader<>( file, schema, options, readBuilder, mapping, filter, reuseContainers, caseSensitive); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index a5689bf43902..8aa9aa4779d9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.UUID; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -189,6 +191,16 @@ public static ParquetValueReader int96Timestamps(ColumnDescriptor desc) { return new TimestampInt96Reader(desc); } + @SuppressWarnings("unchecked") + public static ParquetValueReader structLikeReader( + List> readers, Types.StructType struct, Class structClass) { + if (structClass.equals(Record.class)) { + return ((ParquetValueReader) recordReader(readers, struct)); + } else { + return new StructLikeReader<>(readers, struct, structClass); + } + } + public static ParquetValueReader recordReader( List> readers, Types.StructType struct) { return new RecordReader(readers, struct); @@ -1132,6 +1144,46 @@ private TripleIterator firstNonNullColumn(List> columns) { } } + private static class StructLikeReader extends StructReader { + private final Types.StructType struct; + private final DynConstructors.Ctor ctor; + + StructLikeReader( + List> readers, Types.StructType struct, Class structLikeClass) { + super(readers); + this.struct = struct; + this.ctor = + DynConstructors.builder(StructLike.class) + .hiddenImpl(structLikeClass, Types.StructType.class) + .hiddenImpl(structLikeClass) + .build(); + } + + @Override + protected T newStructData(T reuse) { + if (reuse != null) { + return reuse; + } else { + return ctor.newInstance(struct); + } + } + + @Override + protected Object getField(T intermediate, int pos) { + return intermediate.get(pos, Object.class); + } + + @Override + protected T buildStruct(T s) { + return s; + } + + @Override + protected void set(T s, int pos, Object value) { + s.set(pos, value); + } + } + private static class RecordReader extends StructReader { private final GenericRecord template;