From 306438985713ac15170935b64f19ba01f3ab7196 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 14:18:42 -0700 Subject: [PATCH 01/11] Core: Always project manifest first_row_id. --- core/src/main/java/org/apache/iceberg/ManifestReader.java | 3 +++ .../test/java/org/apache/iceberg/TestRowLineageAssignment.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index d7917eabb10c..f2ef5cb0deff 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -278,6 +278,9 @@ private CloseableIterable> open(Schema projection) { if (projection.findField(DataFile.RECORD_COUNT.fieldId()) == null) { fields.add(DataFile.RECORD_COUNT); } + if (projection.findField(DataFile.FIRST_ROW_ID.fieldId()) == null) { + fields.add(DataFile.FIRST_ROW_ID); + } fields.add(MetadataColumns.ROW_POSITION); CloseableIterable> reader = diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java index 404e083f48d3..38bea41d7ddc 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java @@ -685,6 +685,9 @@ private static void checkDataFileAssignment( try (ManifestReader reader = ManifestFiles.read(manifest, table.io(), table.specs())) { + // test that the first_row_id column is always scanned, even if not requested + reader.select(BaseScan.SCAN_COLUMNS); + for (DataFile file : reader) { assertThat(file.content()).isEqualTo(FileContent.DATA); if (index < firstRowIds.length) { From c9a4537829562b2e8553f08ab6b61c315dc53f44 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 17 Apr 2025 16:57:03 -0700 Subject: [PATCH 02/11] Spark 3.5: Implement _row_id reader. --- .../apache/iceberg/util/PartitionUtil.java | 7 +++ .../org/apache/iceberg/data/DataTest.java | 31 +++++++++++ .../apache/iceberg/data/DataTestHelpers.java | 19 ++++++- .../iceberg/data/parquet/TestGenericData.java | 34 ++++++++----- .../data/parquet/BaseParquetReaders.java | 9 +++- .../iceberg/parquet/ParquetValueReaders.java | 51 +++++++++++++++++++ .../spark/data/SparkParquetReaders.java | 9 +++- .../iceberg/spark/data/AvroDataTest.java | 41 +++++++++++++++ .../iceberg/spark/data/GenericsHelpers.java | 35 +++++++++++-- .../spark/data/TestSparkParquetReader.java | 45 +++++++++++----- 10 files changed, 247 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index 411d401075d6..248c11090b3f 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -53,6 +53,13 @@ private PartitionUtil() {} // use java.util.HashMap because partition data may contain null values Map idToConstant = Maps.newHashMap(); + // add first_row_id as _row_id + if (task.file().firstRowId() != null) { + idToConstant.put( + MetadataColumns.ROW_ID.fieldId(), + convertConstant.apply(Types.LongType.get(), task.file().firstRowId())); + } + // add _file idToConstant.put( MetadataColumns.FILE_PATH.fieldId(), diff --git a/core/src/test/java/org/apache/iceberg/data/DataTest.java b/core/src/test/java/org/apache/iceberg/data/DataTest.java index cc788e2ec078..553bc13fc046 100644 --- a/core/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/core/src/test/java/org/apache/iceberg/data/DataTest.java @@ -28,9 +28,11 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -53,6 +55,10 @@ public abstract class DataTest { + protected static final long FIRST_ROW_ID = 2_000L; + protected static final Map ID_TO_CONSTANT = + Map.of(MetadataColumns.ROW_ID.fieldId(), FIRST_ROW_ID); + protected abstract void writeAndValidate(Schema schema) throws IOException; protected void writeAndValidate(Schema schema, List data) throws IOException { @@ -139,6 +145,10 @@ protected boolean supportsGeospatial() { return false; } + protected boolean supportsRowIds() { + return false; + } + @ParameterizedTest @FieldSource("SIMPLE_TYPES") public void testTypeSchema(Type type) throws IOException { @@ -599,4 +609,25 @@ public void testWriteNullValueForRequiredType() throws Exception { () -> writeAndValidate(schema, ImmutableList.of(genericRecord))); } } + + @Test + public void testRowIds() throws Exception { + Assumptions.assumeThat(supportsRowIds()).as("_row_id support is not implemented").isTrue(); + Schema schema = + new Schema( + required(1, "id", LongType.get()), + required(2, "data", Types.StringType.get()), + MetadataColumns.ROW_ID); + + GenericRecord record = GenericRecord.create(schema); + + writeAndValidate( + schema, + List.of( + record.copy(Map.of("id", 1L, "data", "a")), + record.copy(Map.of("id", 2L, "data", "b")), + record.copy(Map.of("id", 3L, "data", "c", "_row_id", 1_000L)), + record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), + record.copy(Map.of("id", 5L, "data", "e")))); + } } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index e05afb998828..aaa4cb2d5021 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.variants.Variant; @@ -31,12 +32,26 @@ public class DataTestHelpers { private DataTestHelpers() {} public static void assertEquals(Types.StructType struct, Record expected, Record actual) { + assertEquals(struct, expected, actual, null); + } + + public static void assertEquals( + Types.StructType struct, Record expected, Record actual, Long defaultRowId) { Types.StructType expectedType = expected.struct(); for (Types.NestedField field : struct.fields()) { Types.NestedField expectedField = expectedType.field(field.fieldId()); if (expectedField != null) { - assertEquals( - field.type(), expected.getField(expectedField.name()), actual.getField(field.name())); + if (expectedField.fieldId() == MetadataColumns.ROW_ID.fieldId()) { + Long expectedRowId = (Long) expected.getField(expectedField.name()); + if (expectedRowId != null) { + assertEquals(field.type(), expectedRowId, actual.getField(field.name())); + } else { + assertEquals(field.type(), defaultRowId, actual.getField(field.name())); + } + } else { + assertEquals( + field.type(), expected.getField(expectedField.name()), actual.getField(field.name())); + } } else { assertEquals( field.type(), diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index c663ad228c5c..d1ab0e01827c 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -36,8 +36,10 @@ import org.apache.iceberg.data.DataTestHelpers; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; @@ -61,6 +63,11 @@ protected boolean supportsTimestampNanos() { return true; } + @Override + protected boolean supportsRowIds() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); @@ -80,11 +87,10 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) throws IOException { - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).isTrue(); + OutputFile output = new InMemoryOutputFile(); try (FileAppender appender = - Parquet.write(Files.localOutput(testFile)) + Parquet.write(output) .schema(writeSchema) .createWriterFunc(GenericParquetWriter::create) .build()) { @@ -93,30 +99,34 @@ private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List rows; try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(output.toInputFile()) .project(expectedSchema) .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(expectedSchema, fileSchema)) + fileSchema -> + GenericParquetReaders.buildReader(expectedSchema, fileSchema, ID_TO_CONSTANT)) .build()) { rows = Lists.newArrayList(reader); } - for (int i = 0; i < expected.size(); i += 1) { - DataTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i)); + for (int pos = 0; pos < expected.size(); pos += 1) { + DataTestHelpers.assertEquals( + expectedSchema.asStruct(), expected.get(pos), rows.get(pos), FIRST_ROW_ID + pos); } // test reuseContainers try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(output.toInputFile()) .project(expectedSchema) .reuseContainers() .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(expectedSchema, fileSchema)) + fileSchema -> + GenericParquetReaders.buildReader(expectedSchema, fileSchema, ID_TO_CONSTANT)) .build()) { - int index = 0; + int pos = 0; for (Record actualRecord : reader) { - DataTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(index), actualRecord); - index += 1; + DataTestHelpers.assertEquals( + expectedSchema.asStruct(), expected.get(pos), actualRecord, FIRST_ROW_ID + pos); + pos += 1; } } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 4fa2d37a6235..bb2fceac6655 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -254,7 +254,14 @@ public ParquetValueReader struct( for (Types.NestedField field : expectedFields) { int id = field.fieldId(); ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { + if (id == MetadataColumns.ROW_ID.fieldId()) { + Long baseRowId = (Long) idToConstant.get(id); + if (baseRowId != null) { + reorderedFields.add(ParquetValueReaders.rowIds(baseRowId, reader)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + } + } else if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null int fieldMaxDefinitionLevel = maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 63aac8006e2d..fce7b9f0e891 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -161,6 +161,10 @@ public static ParquetValueReader position() { return new PositionReader(); } + public static ParquetValueReader rowIds(long baseRowId, ParquetValueReader idReader) { + return new RowIdReader(baseRowId, (ParquetValueReader) idReader); + } + public static ParquetValueReader uuids(ColumnDescriptor desc) { return new UUIDReader(desc); } @@ -322,6 +326,53 @@ public void setPageSource(PageReadStore pageStore) { } } + private static class RowIdReader implements ParquetValueReader { + private final long firstRowId; + private final ParquetValueReader idReader; + private long rowOffset = -1; + private long rowGroupStart; + + private RowIdReader(long firstRowId, ParquetValueReader idReader) { + this.firstRowId = firstRowId; + this.idReader = idReader != null ? idReader : nulls(); + } + + @Override + public Long read(Long reuse) { + rowOffset += 1; + + Long idFromFile = idReader.read(null); + if (idFromFile != null) { + return idFromFile; + } + + return firstRowId + rowGroupStart + rowOffset; + } + + @Override + public TripleIterator column() { + return idReader.column(); + } + + @Override + public List> columns() { + return idReader.columns(); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + idReader.setPageSource(pageStore); + this.rowGroupStart = + pageStore + .getRowIndexOffset() + .orElseThrow( + () -> + new IllegalArgumentException( + "PageReadStore does not contain row index offset")); + this.rowOffset = -1; + } + } + public abstract static class PrimitiveReader implements ParquetValueReader { private final ColumnDescriptor desc; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 5a7fd50067b9..10e7605c8db0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -162,7 +162,14 @@ public ParquetValueReader struct( for (Types.NestedField field : expectedFields) { int id = field.fieldId(); ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { + if (id == MetadataColumns.ROW_ID.fieldId()) { + Long baseRowId = (Long) idToConstant.get(id); + if (baseRowId != null) { + reorderedFields.add(ParquetValueReaders.rowIds(baseRowId, reader)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + } + } else if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null int fieldMaxDefinitionLevel = maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index 4e4000794ec4..5c700668399a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -27,10 +27,15 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -51,6 +56,10 @@ public abstract class AvroDataTest { + protected static final long FIRST_ROW_ID = 2_000L; + protected static final Map ID_TO_CONSTANT = + Map.of(MetadataColumns.ROW_ID.fieldId(), FIRST_ROW_ID); + protected abstract void writeAndValidate(Schema schema) throws IOException; protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { @@ -58,6 +67,12 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw "Cannot run test, writeAndValidate(Schema, Schema) is not implemented"); } + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List records) + throws IOException { + throw new UnsupportedEncodingException( + "Cannot run test, writeAndValidate(Schema, Schema, List) is not implemented"); + } + protected boolean supportsDefaultValues() { return false; } @@ -66,6 +81,10 @@ protected boolean supportsNestedTypes() { return true; } + protected boolean supportsRowIds() { + return false; + } + protected static final StructType SUPPORTED_PRIMITIVES = StructType.of( required(100, "id", LongType.get()), @@ -547,4 +566,26 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Literal d writeAndValidate(writeSchema, readSchema); } + + @Test + public void testRowIds() throws Exception { + Assumptions.assumeThat(supportsRowIds()).as("_row_id support is not implemented").isTrue(); + Schema schema = + new Schema( + required(1, "id", LongType.get()), + required(2, "data", Types.StringType.get()), + MetadataColumns.ROW_ID); + + GenericRecord record = GenericRecord.create(schema); + + writeAndValidate( + schema, + schema, + List.of( + record.copy(Map.of("id", 1L, "data", "a")), + record.copy(Map.of("id", 2L, "data", "b")), + record.copy(Map.of("id", 3L, "data", "c", "_row_id", 1_000L)), + record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), + record.copy(Map.of("id", 5L, "data", "e")))); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 501b46878bd2..c753f44d5009 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -38,6 +38,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.data.GenericDataUtil; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -205,12 +207,37 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) public static void assertEqualsUnsafe( Types.StructType struct, Record expected, InternalRow actual) { + assertEqualsUnsafe(struct, expected, actual, null); + } + + public static void assertEqualsUnsafe( + Types.StructType struct, Record expected, InternalRow actual, Long defaultRowId) { + Types.StructType expectedType = expected.struct(); List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); + for (int readPos = 0; readPos < fields.size(); readPos += 1) { + Types.NestedField field = fields.get(readPos); + Types.NestedField expectedField = expectedType.field(field.fieldId()); - Object expectedValue = expected.get(i); - Object actualValue = actual.get(i, convert(fieldType)); + Type fieldType = field.type(); + Object actualValue = + actual.isNullAt(readPos) ? null : actual.get(readPos, convert(fieldType)); + + Object expectedValue; + if (expectedField != null) { + if (expectedField.fieldId() == MetadataColumns.ROW_ID.fieldId()) { + Long expectedRowId = (Long) expected.getField(expectedField.name()); + if (expectedRowId != null) { + expectedValue = expectedRowId; + } else { + expectedValue = defaultRowId; + } + } else { + expectedValue = expected.getField(expectedField.name()); + } + } else { + // comparison expects Iceberg's generic representation + expectedValue = GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()); + } assertEqualsUnsafe(fieldType, expectedValue, actualValue); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 1cd4fccfdd3f..d157f62cf48b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -18,18 +18,15 @@ */ package org.apache.iceberg.spark.data; -import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; -import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -39,11 +36,15 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -69,6 +70,13 @@ protected void writeAndValidate(Schema schema) throws IOException { @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomGenericData.generate(writeSchema, 100, 0L); + writeAndValidate(writeSchema, expectedSchema, expected); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { assumeThat( TypeUtil.find( writeSchema, @@ -76,30 +84,39 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw .as("Parquet Avro cannot write non-string map keys") .isNull(); - List expected = RandomData.generateList(writeSchema, 100, 0L); - - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).as("Delete should succeed").isTrue(); - - try (FileAppender writer = - Parquet.write(Files.localOutput(testFile)).schema(writeSchema).named("test").build()) { + OutputFile output = new InMemoryOutputFile(); + try (FileAppender writer = + Parquet.write(output) + .schema(writeSchema) + .createWriterFunc(GenericParquetWriter::create) + .named("test") + .build()) { writer.addAll(expected); } try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(output.toInputFile()) .project(expectedSchema) - .createReaderFunc(type -> SparkParquetReaders.buildReader(expectedSchema, type)) + .createReaderFunc( + type -> SparkParquetReaders.buildReader(expectedSchema, type, ID_TO_CONSTANT)) .build()) { Iterator rows = reader.iterator(); - for (GenericData.Record record : expected) { + int pos = 0; + for (Record record : expected) { assertThat(rows).as("Should have expected number of rows").hasNext(); - assertEqualsUnsafe(expectedSchema.asStruct(), record, rows.next()); + GenericsHelpers.assertEqualsUnsafe( + expectedSchema.asStruct(), record, rows.next(), FIRST_ROW_ID + pos); + pos += 1; } assertThat(rows).as("Should not have extra rows").isExhausted(); } } + @Override + protected boolean supportsRowIds() { + return true; + } + @Override protected boolean supportsDefaultValues() { return true; From 48bee89f8ed876f97e725aa9c321f3c2907e68a4 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 17 Apr 2025 18:46:09 -0700 Subject: [PATCH 03/11] Spark: Implement last updated seq reader. --- .../apache/iceberg/util/PartitionUtil.java | 4 ++ .../org/apache/iceberg/data/DataTest.java | 31 ++++++++++---- .../apache/iceberg/data/DataTestHelpers.java | 33 ++++++++++----- .../iceberg/data/parquet/TestGenericData.java | 6 +-- .../data/parquet/BaseParquetReaders.java | 8 ++++ .../iceberg/parquet/ParquetValueReaders.java | 42 +++++++++++++++++++ .../spark/data/SparkParquetReaders.java | 8 ++++ .../iceberg/spark/data/AvroDataTest.java | 31 ++++++++++---- .../iceberg/spark/data/GenericsHelpers.java | 26 ++++++++---- .../spark/data/TestSparkParquetReader.java | 4 +- 10 files changed, 156 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index 248c11090b3f..ad6ef605420a 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -60,6 +60,10 @@ private PartitionUtil() {} convertConstant.apply(Types.LongType.get(), task.file().firstRowId())); } + idToConstant.put( + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), + convertConstant.apply(Types.LongType.get(), task.file().fileSequenceNumber())); + // add _file idToConstant.put( MetadataColumns.FILE_PATH.fieldId(), diff --git a/core/src/test/java/org/apache/iceberg/data/DataTest.java b/core/src/test/java/org/apache/iceberg/data/DataTest.java index 553bc13fc046..f28022cc792e 100644 --- a/core/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/core/src/test/java/org/apache/iceberg/data/DataTest.java @@ -55,9 +55,13 @@ public abstract class DataTest { - protected static final long FIRST_ROW_ID = 2_000L; + private static final long FIRST_ROW_ID = 2_000L; protected static final Map ID_TO_CONSTANT = - Map.of(MetadataColumns.ROW_ID.fieldId(), FIRST_ROW_ID); + Map.of( + MetadataColumns.ROW_ID.fieldId(), + FIRST_ROW_ID, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), + 34L); protected abstract void writeAndValidate(Schema schema) throws IOException; @@ -145,7 +149,7 @@ protected boolean supportsGeospatial() { return false; } - protected boolean supportsRowIds() { + protected boolean supportsRowLineage() { return false; } @@ -611,13 +615,17 @@ public void testWriteNullValueForRequiredType() throws Exception { } @Test - public void testRowIds() throws Exception { - Assumptions.assumeThat(supportsRowIds()).as("_row_id support is not implemented").isTrue(); + public void testRowLineage() throws Exception { + Assumptions.assumeThat(supportsRowLineage()) + .as("Row Lineage support is not implemented") + .isTrue(); + Schema schema = new Schema( required(1, "id", LongType.get()), required(2, "data", Types.StringType.get()), - MetadataColumns.ROW_ID); + MetadataColumns.ROW_ID, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); GenericRecord record = GenericRecord.create(schema); @@ -626,7 +634,16 @@ public void testRowIds() throws Exception { List.of( record.copy(Map.of("id", 1L, "data", "a")), record.copy(Map.of("id", 2L, "data", "b")), - record.copy(Map.of("id", 3L, "data", "c", "_row_id", 1_000L)), + record.copy( + Map.of( + "id", + 3L, + "data", + "c", + "_row_id", + 1_000L, + "_last_updated_sequence_number", + 33L)), record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), record.copy(Map.of("id", 5L, "data", "e")))); } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index aaa4cb2d5021..fc8d47680b0f 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -32,26 +32,39 @@ public class DataTestHelpers { private DataTestHelpers() {} public static void assertEquals(Types.StructType struct, Record expected, Record actual) { - assertEquals(struct, expected, actual, null); + assertEquals(struct, expected, actual, null, -1); } public static void assertEquals( - Types.StructType struct, Record expected, Record actual, Long defaultRowId) { + Types.StructType struct, + Record expected, + Record actual, + Map idToConstant, + int pos) { Types.StructType expectedType = expected.struct(); for (Types.NestedField field : struct.fields()) { Types.NestedField expectedField = expectedType.field(field.fieldId()); + Object expectedValue; if (expectedField != null) { - if (expectedField.fieldId() == MetadataColumns.ROW_ID.fieldId()) { - Long expectedRowId = (Long) expected.getField(expectedField.name()); - if (expectedRowId != null) { - assertEquals(field.type(), expectedRowId, actual.getField(field.name())); - } else { - assertEquals(field.type(), defaultRowId, actual.getField(field.name())); + int id = expectedField.fieldId(); + if (id == MetadataColumns.ROW_ID.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = (Long) idToConstant.get(id) + pos; } + + } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = idToConstant.get(id); + } + } else { - assertEquals( - field.type(), expected.getField(expectedField.name()), actual.getField(field.name())); + expectedValue = expected.getField(expectedField.name()); } + + assertEquals(field.type(), expectedValue, actual.getField(field.name())); + } else { assertEquals( field.type(), diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index d1ab0e01827c..1b5917e97296 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -64,7 +64,7 @@ protected boolean supportsTimestampNanos() { } @Override - protected boolean supportsRowIds() { + protected boolean supportsRowLineage() { return true; } @@ -110,7 +110,7 @@ private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List struct( } else { reorderedFields.add(ParquetValueReaders.nulls()); } + } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { + Long baseRowId = (Long) idToConstant.get(id); + Long fileSeqNumber = (Long) idToConstant.get(id); + if (fileSeqNumber != null && baseRowId != null) { + reorderedFields.add(ParquetValueReaders.lastUpdated(fileSeqNumber, reader)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + } } else if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null int fieldMaxDefinitionLevel = diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index fce7b9f0e891..4a66d89c5daf 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -161,10 +161,17 @@ public static ParquetValueReader position() { return new PositionReader(); } + @SuppressWarnings("unchecked") public static ParquetValueReader rowIds(long baseRowId, ParquetValueReader idReader) { return new RowIdReader(baseRowId, (ParquetValueReader) idReader); } + @SuppressWarnings("unchecked") + public static ParquetValueReader lastUpdated( + long fileLastUpdated, ParquetValueReader seqReader) { + return new LastUpdatedSeqReader(fileLastUpdated, (ParquetValueReader) seqReader); + } + public static ParquetValueReader uuids(ColumnDescriptor desc) { return new UUIDReader(desc); } @@ -373,6 +380,41 @@ public void setPageSource(PageReadStore pageStore) { } } + private static class LastUpdatedSeqReader implements ParquetValueReader { + private final long fileLastUpdated; + private final ParquetValueReader seqReader; + + private LastUpdatedSeqReader(long fileLastUpdated, ParquetValueReader seqReader) { + this.fileLastUpdated = fileLastUpdated; + this.seqReader = seqReader != null ? seqReader : nulls(); + } + + @Override + public Long read(Long reuse) { + Long rowLastUpdated = seqReader.read(null); + if (rowLastUpdated != null) { + return rowLastUpdated; + } + + return fileLastUpdated; + } + + @Override + public TripleIterator column() { + return seqReader.column(); + } + + @Override + public List> columns() { + return seqReader.columns(); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + seqReader.setPageSource(pageStore); + } + } + public abstract static class PrimitiveReader implements ParquetValueReader { private final ColumnDescriptor desc; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 10e7605c8db0..bb76905dd225 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -169,6 +169,14 @@ public ParquetValueReader struct( } else { reorderedFields.add(ParquetValueReaders.nulls()); } + } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { + Long baseRowId = (Long) idToConstant.get(id); + Long fileSeqNumber = (Long) idToConstant.get(id); + if (fileSeqNumber != null && baseRowId != null) { + reorderedFields.add(ParquetValueReaders.lastUpdated(fileSeqNumber, reader)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + } } else if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null int fieldMaxDefinitionLevel = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index 5c700668399a..a31138ae01a2 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -56,9 +56,13 @@ public abstract class AvroDataTest { - protected static final long FIRST_ROW_ID = 2_000L; + private static final long FIRST_ROW_ID = 2_000L; protected static final Map ID_TO_CONSTANT = - Map.of(MetadataColumns.ROW_ID.fieldId(), FIRST_ROW_ID); + Map.of( + MetadataColumns.ROW_ID.fieldId(), + FIRST_ROW_ID, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), + 34L); protected abstract void writeAndValidate(Schema schema) throws IOException; @@ -81,7 +85,7 @@ protected boolean supportsNestedTypes() { return true; } - protected boolean supportsRowIds() { + protected boolean supportsRowLineage() { return false; } @@ -568,13 +572,17 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Literal d } @Test - public void testRowIds() throws Exception { - Assumptions.assumeThat(supportsRowIds()).as("_row_id support is not implemented").isTrue(); + public void testRowLineage() throws Exception { + Assumptions.assumeThat(supportsRowLineage()) + .as("Row lineage support is not implemented") + .isTrue(); + Schema schema = new Schema( required(1, "id", LongType.get()), required(2, "data", Types.StringType.get()), - MetadataColumns.ROW_ID); + MetadataColumns.ROW_ID, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); GenericRecord record = GenericRecord.create(schema); @@ -584,7 +592,16 @@ public void testRowIds() throws Exception { List.of( record.copy(Map.of("id", 1L, "data", "a")), record.copy(Map.of("id", 2L, "data", "b")), - record.copy(Map.of("id", 3L, "data", "c", "_row_id", 1_000L)), + record.copy( + Map.of( + "id", + 3L, + "data", + "c", + "_row_id", + 1_000L, + "_last_updated_sequence_number", + 33L)), record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), record.copy(Map.of("id", 5L, "data", "e")))); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index c753f44d5009..43cecd0473fd 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -207,11 +207,15 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) public static void assertEqualsUnsafe( Types.StructType struct, Record expected, InternalRow actual) { - assertEqualsUnsafe(struct, expected, actual, null); + assertEqualsUnsafe(struct, expected, actual, null, -1); } public static void assertEqualsUnsafe( - Types.StructType struct, Record expected, InternalRow actual, Long defaultRowId) { + Types.StructType struct, + Record expected, + InternalRow actual, + Map idToConstant, + int pos) { Types.StructType expectedType = expected.struct(); List fields = struct.fields(); for (int readPos = 0; readPos < fields.size(); readPos += 1) { @@ -224,13 +228,19 @@ public static void assertEqualsUnsafe( Object expectedValue; if (expectedField != null) { - if (expectedField.fieldId() == MetadataColumns.ROW_ID.fieldId()) { - Long expectedRowId = (Long) expected.getField(expectedField.name()); - if (expectedRowId != null) { - expectedValue = expectedRowId; - } else { - expectedValue = defaultRowId; + int id = expectedField.fieldId(); + if (id == MetadataColumns.ROW_ID.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = (Long) idToConstant.get(id) + pos; } + + } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = idToConstant.get(id); + } + } else { expectedValue = expected.getField(expectedField.name()); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index d157f62cf48b..210e901bf6c0 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -105,7 +105,7 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List< for (Record record : expected) { assertThat(rows).as("Should have expected number of rows").hasNext(); GenericsHelpers.assertEqualsUnsafe( - expectedSchema.asStruct(), record, rows.next(), FIRST_ROW_ID + pos); + expectedSchema.asStruct(), record, rows.next(), ID_TO_CONSTANT, pos); pos += 1; } assertThat(rows).as("Should not have extra rows").isExhausted(); @@ -113,7 +113,7 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List< } @Override - protected boolean supportsRowIds() { + protected boolean supportsRowLineage() { return true; } From 492259d1e9195fdf2b21f009a5e5eba396087c00 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 10:08:59 -0700 Subject: [PATCH 04/11] Remove unnecessary DL logic for constants. --- .../data/parquet/BaseParquetReaders.java | 15 ++-- .../iceberg/parquet/ParquetValueReaders.java | 71 +++++++++++-------- .../spark/data/SparkParquetReaders.java | 14 ++-- 3 files changed, 49 insertions(+), 51 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 68290ea7d7f7..9aca2848a8ec 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -228,7 +228,6 @@ public ParquetValueReader struct( // match the expected struct's order Map> readersById = Maps.newHashMap(); Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -238,9 +237,6 @@ public ParquetValueReader struct( int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } @@ -249,8 +245,8 @@ public ParquetValueReader struct( List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + // use the struct's DL for constants that are always non-null if the struct is non-null + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); ParquetValueReader reader = readersById.get(id); @@ -271,10 +267,8 @@ public ParquetValueReader struct( } } else if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); + ParquetValueReaders.constant(idToConstant.get(id), constantDefinitionLevel)); types.add(null); } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(ParquetValueReaders.position()); @@ -288,8 +282,7 @@ public ParquetValueReader struct( } else if (field.initialDefault() != null) { reorderedFields.add( ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + convertConstant(field.type(), field.initialDefault()), constantDefinitionLevel)); types.add(typesById.get(id)); } else if (field.isOptional()) { reorderedFields.add(ParquetValueReaders.nulls()); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 4a66d89c5daf..8388831ca6b3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -248,37 +248,15 @@ private static class ConstantReader implements ParquetValueReader { this.children = NullReader.COLUMNS; } - ConstantReader(C constantValue, int definitionLevel) { + ConstantReader(C constantValue, int parentDl) { this.constantValue = constantValue; - this.column = - new TripleIterator() { - @Override - public int currentDefinitionLevel() { - return definitionLevel; - } - - @Override - public int currentRepetitionLevel() { - return 0; - } - - @Override - public N nextNull() { - return null; - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public Object next() { - return null; - } - }; - - this.children = ImmutableList.of(column); + if (constantValue != null) { + this.column = new ConstantDLColumn<>(parentDl); + this.children = ImmutableList.of(column); + } else { + this.column = NullReader.NULL_COLUMN; + this.children = NullReader.COLUMNS; + } } @Override @@ -298,6 +276,39 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore) {} + + private static class ConstantDLColumn implements TripleIterator { + private final int definitionLevel; + + private ConstantDLColumn(int definitionLevel) { + this.definitionLevel = definitionLevel; + } + + @Override + public int currentDefinitionLevel() { + return definitionLevel; + } + + @Override + public int currentRepetitionLevel() { + return 0; + } + + @Override + public N nextNull() { + return null; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public T next() { + return null; + } + } } private static class PositionReader implements ParquetValueReader { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index bb76905dd225..e1e3fc3051c3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -138,7 +138,6 @@ public ParquetValueReader struct( // match the expected struct's order Map> readersById = Maps.newHashMap(); Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { Type fieldType = fields.get(i); @@ -147,9 +146,6 @@ public ParquetValueReader struct( int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } @@ -157,8 +153,8 @@ public ParquetValueReader struct( expected != null ? expected.fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + // use the struct's DL for constants that are always non-null if the struct is non-null + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); ParquetValueReader reader = readersById.get(id); @@ -179,10 +175,8 @@ public ParquetValueReader struct( } } else if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); + ParquetValueReaders.constant(idToConstant.get(id), constantDefinitionLevel)); } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(ParquetValueReaders.position()); } else if (id == MetadataColumns.IS_DELETED.fieldId()) { @@ -193,7 +187,7 @@ public ParquetValueReader struct( reorderedFields.add( ParquetValueReaders.constant( SparkUtil.internalToSpark(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + constantDefinitionLevel)); } else if (field.isOptional()) { reorderedFields.add(ParquetValueReaders.nulls()); } else { From 4ae7dbd20ede5997af22c5b215d1963026cf3f8d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 10:47:59 -0700 Subject: [PATCH 05/11] Reduce complexity of struct method in read builders. --- .../data/parquet/BaseParquetReaders.java | 83 +++++++------------ .../data/parquet/GenericParquetReaders.java | 3 +- .../iceberg/data/parquet/InternalReader.java | 3 +- .../iceberg/parquet/ParquetValueReaders.java | 38 ++++++++- .../spark/data/SparkParquetReaders.java | 69 ++++++--------- 5 files changed, 89 insertions(+), 107 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 9aca2848a8ec..8f2957e1c60d 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -77,7 +76,7 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List types, List> fieldReaders, Types.StructType structType); + List> fieldReaders, Types.StructType structType); protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); @@ -110,7 +109,6 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -118,11 +116,10 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - types.add(fieldType); } } - return createStructReader(types, newFields, expected); + return createStructReader(newFields, expected); } } @@ -225,9 +222,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return createStructReader(ImmutableList.of(), null); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -236,64 +236,37 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - typesById.put(id, fieldType); } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // use the struct's DL for constants that are always non-null if the struct is non-null - int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (id == MetadataColumns.ROW_ID.fieldId()) { - Long baseRowId = (Long) idToConstant.get(id); - if (baseRowId != null) { - reorderedFields.add(ParquetValueReaders.rowIds(baseRowId, reader)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - } - } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { - Long baseRowId = (Long) idToConstant.get(id); - Long fileSeqNumber = (Long) idToConstant.get(id); - if (fileSeqNumber != null && baseRowId != null) { - reorderedFields.add(ParquetValueReaders.lastUpdated(fileSeqNumber, reader)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - } - } else if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), constantDefinitionLevel)); - types.add(null); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - types.add(null); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - types.add(null); - } else if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), constantDefinitionLevel)); - types.add(typesById.get(id)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); + } + + return createStructReader(reorderedFields, expected); + } + + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); } - return createStructReader(types, reorderedFields, expected); + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index e12f379b36bb..4af7ee381f61 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -41,7 +41,6 @@ import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; public class GenericParquetReaders extends BaseParquetReaders { @@ -61,7 +60,7 @@ public static ParquetValueReader buildReader( @Override protected ParquetValueReader createStructReader( - List types, List> fieldReaders, StructType structType) { + List> fieldReaders, StructType structType) { return ParquetValueReaders.recordReader(fieldReaders, structType); } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java index 05613eb1de16..03585c55c9b6 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -27,7 +27,6 @@ import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; public class InternalReader extends BaseParquetReaders { @@ -50,7 +49,7 @@ public static ParquetValueReader create( @Override @SuppressWarnings("unchecked") protected ParquetValueReader createStructReader( - List types, List> fieldReaders, StructType structType) { + List> fieldReaders, StructType structType) { return (ParquetValueReader) ParquetValueReaders.recordReader(fieldReaders, structType); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 8388831ca6b3..a4a5d040c183 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -162,14 +163,22 @@ public static ParquetValueReader position() { } @SuppressWarnings("unchecked") - public static ParquetValueReader rowIds(long baseRowId, ParquetValueReader idReader) { - return new RowIdReader(baseRowId, (ParquetValueReader) idReader); + public static ParquetValueReader rowIds(Long baseRowId, ParquetValueReader idReader) { + if (baseRowId != null) { + return new RowIdReader(baseRowId, (ParquetValueReader) idReader); + } else { + return ParquetValueReaders.nulls(); + } } @SuppressWarnings("unchecked") public static ParquetValueReader lastUpdated( - long fileLastUpdated, ParquetValueReader seqReader) { - return new LastUpdatedSeqReader(fileLastUpdated, (ParquetValueReader) seqReader); + Long baseRowId, Long fileLastUpdated, ParquetValueReader seqReader) { + if (fileLastUpdated != null && baseRowId != null) { + return new LastUpdatedSeqReader(fileLastUpdated, (ParquetValueReader) seqReader); + } else { + return ParquetValueReaders.nulls(); + } } public static ParquetValueReader uuids(ColumnDescriptor desc) { @@ -185,6 +194,27 @@ public static ParquetValueReader recordReader( return new RecordReader(readers, struct); } + public static ParquetValueReader replaceWithMetadataReader( + int id, ParquetValueReader reader, Map idToConstant, int constantDL) { + if (id == MetadataColumns.ROW_ID.fieldId()) { + Long baseRowId = (Long) idToConstant.get(id); + return ParquetValueReaders.rowIds(baseRowId, reader); + } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { + Long baseRowId = (Long) idToConstant.get(id); + Long fileSeqNumber = (Long) idToConstant.get(id); + return ParquetValueReaders.lastUpdated(baseRowId, fileSeqNumber, reader); + } else if (idToConstant.containsKey(id)) { + // containsKey is used because the constant may be null + return ParquetValueReaders.constant(idToConstant.get(id), constantDL); + } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { + return ParquetValueReaders.position(); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + return ParquetValueReaders.constant(false, constantDL); + } + + return reader; + } + private static class NullReader implements ParquetValueReader { private static final NullReader INSTANCE = new NullReader<>(); private static final ImmutableList> COLUMNS = ImmutableList.of(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index e1e3fc3051c3..87c97cc7a663 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -135,9 +134,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return new InternalRowReader(ImmutableList.of()); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { Type fieldType = fields.get(i); @@ -145,60 +147,39 @@ public ParquetValueReader struct( if (fieldType.getId() != null) { int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - typesById.put(id, fieldType); } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // use the struct's DL for constants that are always non-null if the struct is non-null - int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (id == MetadataColumns.ROW_ID.fieldId()) { - Long baseRowId = (Long) idToConstant.get(id); - if (baseRowId != null) { - reorderedFields.add(ParquetValueReaders.rowIds(baseRowId, reader)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - } - } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { - Long baseRowId = (Long) idToConstant.get(id); - Long fileSeqNumber = (Long) idToConstant.get(id); - if (fileSeqNumber != null && baseRowId != null) { - reorderedFields.add(ParquetValueReaders.lastUpdated(fileSeqNumber, reader)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - } - } else if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), constantDefinitionLevel)); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - } else if (reader != null) { - reorderedFields.add(reader); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - SparkUtil.internalToSpark(field.type(), field.initialDefault()), - constantDefinitionLevel)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); } return new InternalRowReader(reorderedFields); } + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + SparkUtil.internalToSpark(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); + } + + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); + } + @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { From e4bda01dbea3e4f974b2239c557cd1462940d319 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 11:55:12 -0700 Subject: [PATCH 06/11] Fix revapi. --- .../data/parquet/GenericParquetReaders.java | 13 +++++++++++++ .../iceberg/data/parquet/InternalReader.java | 14 ++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 4af7ee381f61..3571307e3d56 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -36,6 +36,7 @@ import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -58,6 +59,18 @@ public static ParquetValueReader buildReader( return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); } + /** + * Create a struct reader. + * + * @deprecated will be removed in 1.10.0; use {@link #createStructReader(List, StructType)} + * instead. + */ + @Deprecated + protected ParquetValueReader createStructReader( + List types, List> fieldReaders, StructType structType) { + return ParquetValueReaders.recordReader(fieldReaders, structType); + } + @Override protected ParquetValueReader createStructReader( List> fieldReaders, StructType structType) { diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java index 03585c55c9b6..3a5c12d07f81 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -24,6 +24,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; @@ -46,6 +47,19 @@ public static ParquetValueReader create( return (ParquetValueReader) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); } + /** + * Create a struct reader. + * + * @deprecated will be removed in 1.10.0; use {@link #createStructReader(List, StructType)} + * instead. + */ + @Deprecated + @SuppressWarnings("unchecked") + protected ParquetValueReader createStructReader( + List types, List> fieldReaders, StructType structType) { + return (ParquetValueReader) ParquetValueReaders.recordReader(fieldReaders, structType); + } + @Override @SuppressWarnings("unchecked") protected ParquetValueReader createStructReader( From 42aaf260a58e223db99804b9f057970258db0c94 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 11:57:17 -0700 Subject: [PATCH 07/11] Fix Flink tests. --- .../org/apache/iceberg/parquet/ParquetValueReaders.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index a4a5d040c183..978bef45aa36 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -280,13 +280,8 @@ private static class ConstantReader implements ParquetValueReader { ConstantReader(C constantValue, int parentDl) { this.constantValue = constantValue; - if (constantValue != null) { - this.column = new ConstantDLColumn<>(parentDl); - this.children = ImmutableList.of(column); - } else { - this.column = NullReader.NULL_COLUMN; - this.children = NullReader.COLUMNS; - } + this.column = new ConstantDLColumn<>(parentDl); + this.children = ImmutableList.of(column); } @Override From 4b5e0137b29e12f3ae64b9ddffc1762d23dbab3a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 12:31:34 -0700 Subject: [PATCH 08/11] Use position reader rather than duplicating. --- .../iceberg/parquet/ParquetValueReaders.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 978bef45aa36..a68fd71f91be 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -372,24 +372,22 @@ public void setPageSource(PageReadStore pageStore) { private static class RowIdReader implements ParquetValueReader { private final long firstRowId; private final ParquetValueReader idReader; - private long rowOffset = -1; - private long rowGroupStart; + private final ParquetValueReader posReader; private RowIdReader(long firstRowId, ParquetValueReader idReader) { this.firstRowId = firstRowId; this.idReader = idReader != null ? idReader : nulls(); + this.posReader = position(); } @Override public Long read(Long reuse) { - rowOffset += 1; - Long idFromFile = idReader.read(null); if (idFromFile != null) { return idFromFile; } - return firstRowId + rowGroupStart + rowOffset; + return firstRowId + posReader.read(null); } @Override @@ -405,14 +403,7 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore) { idReader.setPageSource(pageStore); - this.rowGroupStart = - pageStore - .getRowIndexOffset() - .orElseThrow( - () -> - new IllegalArgumentException( - "PageReadStore does not contain row index offset")); - this.rowOffset = -1; + posReader.setPageSource(pageStore); } } From 47f9ee70798555b5f74ec9c8f20d16ffd1ff0d89 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 12:42:18 -0700 Subject: [PATCH 09/11] Fix revapi. --- .../org/apache/iceberg/data/parquet/GenericParquetReaders.java | 2 +- .../java/org/apache/iceberg/data/parquet/InternalReader.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 3571307e3d56..182412cfb54c 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -36,12 +36,12 @@ import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; public class GenericParquetReaders extends BaseParquetReaders { diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java index 3a5c12d07f81..692a9857cf77 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -24,10 +24,10 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; public class InternalReader extends BaseParquetReaders { From fc020407f195e360dbeb00a0a7ef3291e2aaf051 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 13:21:04 -0700 Subject: [PATCH 10/11] Fix position reader use. --- .../java/org/apache/iceberg/parquet/ParquetValueReaders.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index a68fd71f91be..e91db8282e60 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -382,12 +382,14 @@ private RowIdReader(long firstRowId, ParquetValueReader idReader) { @Override public Long read(Long reuse) { + // always call the position reader to keep the position accurate + long pos = posReader.read(null); Long idFromFile = idReader.read(null); if (idFromFile != null) { return idFromFile; } - return firstRowId + posReader.read(null); + return firstRowId + pos; } @Override From a18b51f2966a1df84084a475a2a056d9eb1d783b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 18 Apr 2025 14:20:35 -0700 Subject: [PATCH 11/11] Spark: Add basic row lineage test. --- .../java/org/apache/iceberg/spark/source/SparkBatch.java | 5 ++++- .../org/apache/iceberg/spark/SparkTestHelperBase.java | 9 ++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 11f054b11710..45bd48aea2ec 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -180,7 +180,10 @@ private boolean useCometBatchReads() { } private boolean supportsCometBatchReads(Types.NestedField field) { - return field.type().isPrimitiveType() && !field.type().typeId().equals(Type.TypeID.UUID); + return field.type().isPrimitiveType() + && !field.type().typeId().equals(Type.TypeID.UUID) + && field.fieldId() != MetadataColumns.ROW_ID.fieldId() + && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); } // conditions for using ORC batch reads: diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java index 0b3d0244a087..e1b75ca55e34 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java @@ -63,10 +63,7 @@ protected void assertEquals( Object[] expected = expectedRows.get(row); Object[] actual = actualRows.get(row); assertThat(actual).as("Number of columns should match").hasSameSizeAs(expected); - for (int col = 0; col < actualRows.get(row).length; col += 1) { - String newContext = String.format("%s: row %d col %d", context, row + 1, col + 1); - assertEquals(newContext, expected, actual); - } + assertEquals(context + ": row " + (row + 1), expected, actual); } } @@ -83,7 +80,9 @@ protected void assertEquals(String context, Object[] expectedRow, Object[] actua assertEquals(newContext, (Object[]) expectedValue, (Object[]) actualValue); } } else if (expectedValue != ANY) { - assertThat(actualValue).as(context + " contents should match").isEqualTo(expectedValue); + assertThat(actualValue) + .as(context + " col " + (col + 1) + " contents should match") + .isEqualTo(expectedValue); } } }