Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -390,6 +391,14 @@ public static CustomRow of(Object... values) {
return new CustomRow(values);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructors so we can use Custom Row as a CustomType in internal Data

public CustomRow() {
this(new Object[0]);
}

public CustomRow(Types.StructType structType) {
this.values = new Object[structType.fields().size()];
}

private final Object[] values;

private CustomRow(Object... values) {
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ project(':iceberg-core') {
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
testRuntimeOnly project(':iceberg-parquet')
annotationProcessor libs.immutables.value
compileOnly libs.immutables.value

Expand Down
158 changes: 158 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestInternalData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public class TestInternalData {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have some tests for these code paths in TestInternalAvro and TestInternalParquet but those suites mostly function by piping everything through a "writeAndValidate" method which is not conducive to adding new functionality. It currently is used to test reuse containers and projections but I think we may want to consider separating those tests out as well into tests that run in core if possible.


@Parameter(index = 0)
private FileFormat format;

@Parameters(name = " format = {0}")
protected static List<FileFormat> parameters() {
return Arrays.asList(FileFormat.AVRO, FileFormat.PARQUET);
}

private static final Schema SIMPLE_SCHEMA =
new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()));

private static final Schema NESTED_SCHEMA =
new Schema(
Types.NestedField.required(1, "outer_id", Types.LongType.get()),
Types.NestedField.optional(
2,
"nested_struct",
Types.StructType.of(
Types.NestedField.optional(3, "inner_id", Types.LongType.get()),
Types.NestedField.optional(4, "inner_name", Types.StringType.get()))));

@TempDir private Path tempDir;

private final FileIO fileIO = new TestTables.LocalFileIO();

@TestTemplate
public void testCustomRootType() throws IOException {
OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString());

List<Record> testData = RandomInternalData.generate(SIMPLE_SCHEMA, 1000, 1L);

try (FileAppender<Record> appender =
InternalData.write(format, outputFile).schema(SIMPLE_SCHEMA).build()) {
appender.addAll(testData);
}

InputFile inputFile = fileIO.newInputFile(outputFile.location());
List<PartitionData> readRecords = Lists.newArrayList();

try (CloseableIterable<PartitionData> reader =
InternalData.read(format, inputFile)
.project(SIMPLE_SCHEMA)
.setRootType(PartitionData.class)
.build()) {
for (PartitionData record : reader) {
readRecords.add(record);
}
}

assertThat(readRecords).hasSameSizeAs(testData);

for (int i = 0; i < testData.size(); i++) {
Record expected = testData.get(i);
PartitionData actual = readRecords.get(i);

assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class));
assertThat(actual.get(1, String.class)).isEqualTo(expected.get(1, String.class));
}
}

@TestTemplate
public void testCustomTypeForNestedField() throws IOException {
OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString());

List<Record> testData = RandomInternalData.generate(NESTED_SCHEMA, 1000, 1L);

try (FileAppender<Record> appender =
InternalData.write(format, outputFile).schema(NESTED_SCHEMA).build()) {
appender.addAll(testData);
}

InputFile inputFile = fileIO.newInputFile(outputFile.location());
List<Record> readRecords = Lists.newArrayList();

try (CloseableIterable<Record> reader =
InternalData.read(format, inputFile)
.project(NESTED_SCHEMA)
.setCustomType(2, TestHelpers.CustomRow.class)
.build()) {
for (Record record : reader) {
readRecords.add(record);
}
}

assertThat(readRecords).hasSameSizeAs(testData);

for (int i = 0; i < testData.size(); i++) {
Record expected = testData.get(i);
Record actual = readRecords.get(i);

assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class));

Object expectedNested = expected.get(1);
Object actualNested = actual.get(1);

if (expectedNested == null) {
// Expected nested struct is null, so actual should also be null
assertThat(actualNested).isNull();
} else {
// Expected nested struct is not null, so actual should be a CustomRow
assertThat(actualNested).isNotNull();
assertThat(actualNested)
.as("Custom type should be TestHelpers.CustomRow but was: " + actualNested.getClass())
.isInstanceOf(TestHelpers.CustomRow.class);
TestHelpers.CustomRow customRow = (TestHelpers.CustomRow) actualNested;
Record expectedRecord = (Record) expectedNested;

assertThat(customRow.get(0, Long.class))
.isEqualTo(expectedRecord.get(0, Long.class)); // inner_id
assertThat(customRow.get(1, String.class))
.isEqualTo(expectedRecord.get(1, String.class)); // inner_name
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ private static Parquet.WriteBuilder writeInternal(OutputFile outputFile) {
}

private static Parquet.ReadBuilder readInternal(InputFile inputFile) {
return Parquet.read(inputFile).createReaderFunc(InternalReader::create);
return Parquet.read(inputFile).createReaderFunc(InternalReader.readerFunction());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.apache.parquet.schema.Type;

abstract class BaseParquetReaders<T> {
// Root ID is used for the top-level struct in the Parquet Schema
protected static final int ROOT_ID = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic number could use documentation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the line above this not sufficent?


protected BaseParquetReaders() {}

protected ParquetValueReader<T> createReader(Schema expectedSchema, MessageType fileSchema) {
Expand All @@ -75,8 +78,26 @@ protected ParquetValueReader<T> createReader(
}
}

protected abstract ParquetValueReader<T> createStructReader(
List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);
/**
* @deprecated will be removed in 1.12.0. Subclasses should override {@link
* #createStructReader(List, Types.StructType, Integer)} instead
*/
@Deprecated
protected ParquetValueReader<T> createStructReader(
List<ParquetValueReader<?>> fieldReaders, Types.StructType structType) {
throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can actually throw here, isn't that a behavioral change for when people upgrade their Iceberg version and they were in some place using createStructReader(fieldReaders, structType)?

Copy link
Member Author

@RussellSpitzer RussellSpitzer Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the method was "abstract" before so they would have had to have implemented this method. I made it concrete here so I could remove the implementations from the two child classes we currently have rather than marking it deprecated there as well.

If you check the 3 arg version, it calls the subclasses 2 arg version if not defined so existing behavior should be the same.

"Deprecated method is not used in this implementation, only createStructReader(list, Types.Struct, Integer) should be used");
}

/**
* This method can be overridden to provide a custom implementation which also uses the fieldId of
* the Schema when creating the struct reader
*/
protected ParquetValueReader<T> createStructReader(
List<ParquetValueReader<?>> fieldReaders, Types.StructType structType, Integer fieldId) {
// Fallback to the signature without fieldId if not overridden
return createStructReader(fieldReaders, structType);
}

protected abstract ParquetValueReader<?> fixedReader(ColumnDescriptor desc);

Expand All @@ -99,8 +120,9 @@ private FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
@Override
public ParquetValueReader<?> message(
Types.StructType expected, MessageType message, List<ParquetValueReader<?>> fieldReaders) {
// the top level matches by ID, but the remaining IDs are missing
return super.struct(expected, message, fieldReaders);
// The top level matches by ID, but the remaining IDs are missing
// Mark the top-level struct with the ROOT_ID
return super.struct(expected, message.withId(ROOT_ID), fieldReaders);
}

@Override
Expand All @@ -119,10 +141,15 @@ public ParquetValueReader<?> struct(
}
}

return createStructReader(newFields, expected);
return createStructReader(newFields, expected, fieldId(struct));
}
}

/** Returns the field ID from a Parquet GroupType, returning null if the ID is not set */
private static Integer fieldId(GroupType struct) {
return struct.getId() != null ? struct.getId().intValue() : null;
}

private class LogicalTypeReadBuilder
implements LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {

Expand Down Expand Up @@ -216,14 +243,14 @@ private ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
@Override
public ParquetValueReader<?> message(
Types.StructType expected, MessageType message, List<ParquetValueReader<?>> fieldReaders) {
return struct(expected, message.asGroupType(), fieldReaders);
return struct(expected, message.asGroupType().withId(ROOT_ID), fieldReaders);
}

@Override
public ParquetValueReader<?> struct(
Types.StructType expected, GroupType struct, List<ParquetValueReader<?>> fieldReaders) {
if (null == expected) {
return createStructReader(ImmutableList.of(), null);
return createStructReader(ImmutableList.of(), null, fieldId(struct));
}

// match the expected struct's order
Expand Down Expand Up @@ -252,7 +279,7 @@ public ParquetValueReader<?> struct(
reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel));
}

return createStructReader(reorderedFields, expected);
return createStructReader(reorderedFields, expected, fieldId(struct));
}

private ParquetValueReader<?> defaultReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static ParquetValueReader<Record> buildReader(

@Override
protected ParquetValueReader<Record> createStructReader(
List<ParquetValueReader<?>> fieldReaders, StructType structType) {
List<ParquetValueReader<?>> fieldReaders, StructType structType, Integer fieldId) {
return ParquetValueReaders.recordReader(fieldReaders, structType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;

public class InternalReader<T extends StructLike> extends BaseParquetReaders<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this into a stateful reader so that Parquet.java can set custom classes for the reader to use. I'm definitely open to other ways around this. The previous approach was for Parquet to statically used the create method to generate a reader for "readerFunc", this had the problem that each call to set custom types would happen after the reader func was set.


private final Map<Integer, Class<? extends StructLike>> typesById = Maps.newHashMap();

private static final InternalReader<?> INSTANCE = new InternalReader<>();

private InternalReader() {}
Expand All @@ -46,11 +52,48 @@ public static <T extends StructLike> ParquetValueReader<T> create(
return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
}

public static Parquet.ReadBuilder.ReaderFunction readerFunction() {
InternalReader<?> reader = new InternalReader<>();

return new Parquet.ReadBuilder.ReaderFunction() {
private Schema schema;

@Override
public Function<MessageType, ParquetValueReader<?>> apply() {
return messageType -> reader.createReader(schema, messageType);
}

@Override
public Parquet.ReadBuilder.ReaderFunction withSchema(Schema schema) {
this.schema = schema;
return this;
}

@Override
public Parquet.ReadBuilder.ReaderFunction withCustomTypes(
Copy link
Contributor

@nastra nastra Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like this naming you have here better, but just wanted to point out that in the Avro reader we use setCustomTypes / setRootType / setSchema

EDIT: nvm, there isn't any direct relation between the Avro and Parquet internal readers, so having different naming here should be fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 Yeah I'm not sure it's worth standardizing since this is only called by Parquet.java anyway

Map<Integer, Class<? extends StructLike>> customTypes) {
reader.typesById.putAll(customTypes);
return this;
}

@Override
public Parquet.ReadBuilder.ReaderFunction withRootType(Class<? extends StructLike> rootType) {
if (rootType != null) {
reader.typesById.put(ROOT_ID, rootType);
}

return this;
}
};
}

@Override
@SuppressWarnings("unchecked")
protected ParquetValueReader<T> createStructReader(
List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return (ParquetValueReader<T>) ParquetValueReaders.recordReader(fieldReaders, structType);
List<ParquetValueReader<?>> fieldReaders, StructType structType, Integer fieldId) {
return (ParquetValueReader<T>)
ParquetValueReaders.structLikeReader(
fieldReaders, structType, typesById.getOrDefault(fieldId, Record.class));
}

@Override
Expand Down
Loading