From ca264e6df85a3b1526b8308ae2ee2632f01db2f3 Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 19 Jun 2025 09:41:50 +0700 Subject: [PATCH] back port non vectorized readers for 3.4 --- .../spark/data/SparkParquetReaders.java | 60 ++++++++----------- .../iceberg/spark/source/SparkBatch.java | 5 +- .../iceberg/spark/SparkTestHelperBase.java | 7 ++- .../iceberg/spark/data/AvroDataTest.java | 58 ++++++++++++++++++ .../iceberg/spark/data/GenericsHelpers.java | 45 ++++++++++++-- .../spark/data/TestSparkParquetReader.java | 45 +++++++++----- 6 files changed, 163 insertions(+), 57 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 223d7a8995dc..f9300099fe25 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetUtil; @@ -137,10 +136,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return new InternalRowReader(ImmutableList.of()); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { Type fieldType = fields.get(i); @@ -148,50 +149,39 @@ public ParquetValueReader struct( if (fieldType.getId() != null) { int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - } else if (reader != null) { - reorderedFields.add(reader); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - SparkUtil.internalToSpark(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); } return new InternalRowReader(reorderedFields); } + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + SparkUtil.internalToSpark(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); + } + + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); + } + @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 11f054b11710..45bd48aea2ec 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -180,7 +180,10 @@ private boolean useCometBatchReads() { } private boolean supportsCometBatchReads(Types.NestedField field) { - return field.type().isPrimitiveType() && !field.type().typeId().equals(Type.TypeID.UUID); + return field.type().isPrimitiveType() + && !field.type().typeId().equals(Type.TypeID.UUID) + && field.fieldId() != MetadataColumns.ROW_ID.fieldId() + && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); } // conditions for using ORC batch reads: diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java index 022702d85378..bb10e7b700b9 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java @@ -64,8 +64,7 @@ protected void assertEquals( Object[] actual = actualRows.get(row); assertThat(actual).as("Number of columns should match").hasSameSizeAs(expected); for (int col = 0; col < actualRows.get(row).length; col += 1) { - String newContext = String.format("%s: row %d col %d", context, row + 1, col + 1); - assertEquals(newContext, expected, actual); + assertEquals(context + ": row " + (row + 1), expected, actual); } } } @@ -83,7 +82,9 @@ protected void assertEquals(String context, Object[] expectedRow, Object[] actua assertEquals(newContext, (Object[]) expectedValue, (Object[]) actualValue); } } else if (expectedValue != ANY) { - assertThat(actualValue).as("%s contents should match", context).isEqualTo(expectedValue); + assertThat(actualValue) + .as(context + " col " + (col + 1) + " contents should match") + .isEqualTo(expectedValue); } } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index 4e4000794ec4..a31138ae01a2 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -27,10 +27,15 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -51,6 +56,14 @@ public abstract class AvroDataTest { + private static final long FIRST_ROW_ID = 2_000L; + protected static final Map ID_TO_CONSTANT = + Map.of( + MetadataColumns.ROW_ID.fieldId(), + FIRST_ROW_ID, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), + 34L); + protected abstract void writeAndValidate(Schema schema) throws IOException; protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { @@ -58,6 +71,12 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw "Cannot run test, writeAndValidate(Schema, Schema) is not implemented"); } + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List records) + throws IOException { + throw new UnsupportedEncodingException( + "Cannot run test, writeAndValidate(Schema, Schema, List) is not implemented"); + } + protected boolean supportsDefaultValues() { return false; } @@ -66,6 +85,10 @@ protected boolean supportsNestedTypes() { return true; } + protected boolean supportsRowLineage() { + return false; + } + protected static final StructType SUPPORTED_PRIMITIVES = StructType.of( required(100, "id", LongType.get()), @@ -547,4 +570,39 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Literal d writeAndValidate(writeSchema, readSchema); } + + @Test + public void testRowLineage() throws Exception { + Assumptions.assumeThat(supportsRowLineage()) + .as("Row lineage support is not implemented") + .isTrue(); + + Schema schema = + new Schema( + required(1, "id", LongType.get()), + required(2, "data", Types.StringType.get()), + MetadataColumns.ROW_ID, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + GenericRecord record = GenericRecord.create(schema); + + writeAndValidate( + schema, + schema, + List.of( + record.copy(Map.of("id", 1L, "data", "a")), + record.copy(Map.of("id", 2L, "data", "b")), + record.copy( + Map.of( + "id", + 3L, + "data", + "c", + "_row_id", + 1_000L, + "_last_updated_sequence_number", + 33L)), + record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), + record.copy(Map.of("id", 5L, "data", "e")))); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index e7f6389cb55c..7d6e51a0dfab 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -38,6 +38,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.data.GenericDataUtil; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -198,12 +200,47 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) public static void assertEqualsUnsafe( Types.StructType struct, Record expected, InternalRow actual) { + assertEqualsUnsafe(struct, expected, actual, null, -1); + } + + public static void assertEqualsUnsafe( + Types.StructType struct, + Record expected, + InternalRow actual, + Map idToConstant, + int pos) { + Types.StructType expectedType = expected.struct(); List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); + for (int readPos = 0; readPos < fields.size(); readPos += 1) { + Types.NestedField field = fields.get(readPos); + Types.NestedField expectedField = expectedType.field(field.fieldId()); - Object expectedValue = expected.get(i); - Object actualValue = actual.get(i, convert(fieldType)); + Type fieldType = field.type(); + Object actualValue = + actual.isNullAt(readPos) ? null : actual.get(readPos, convert(fieldType)); + + Object expectedValue; + if (expectedField != null) { + int id = expectedField.fieldId(); + if (id == MetadataColumns.ROW_ID.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = (Long) idToConstant.get(id) + pos; + } + + } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = idToConstant.get(id); + } + + } else { + expectedValue = expected.getField(expectedField.name()); + } + } else { + // comparison expects Iceberg's generic representation + expectedValue = GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()); + } assertEqualsUnsafe(fieldType, expectedValue, actualValue); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index ca2d010b7537..d3d1ca1338dc 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.data; -import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -28,7 +27,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -38,7 +36,9 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; @@ -70,6 +70,13 @@ protected void writeAndValidate(Schema schema) throws IOException { @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomGenericData.generate(writeSchema, 100, 0L); + writeAndValidate(writeSchema, expectedSchema, expected); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { assumeThat( null == TypeUtil.find( @@ -79,29 +86,39 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw .as("Parquet Avro cannot write non-string map keys") .isTrue(); - List expected = RandomData.generateList(writeSchema, 100, 0L); - - OutputFile outputFile = new InMemoryOutputFile(); - - try (FileAppender writer = - Parquet.write(outputFile).schema(writeSchema).named("test").build()) { + OutputFile output = new InMemoryOutputFile(); + try (FileAppender writer = + Parquet.write(output) + .schema(writeSchema) + .createWriterFunc(GenericParquetWriter::create) + .named("test") + .build()) { writer.addAll(expected); } try (CloseableIterable reader = - Parquet.read(outputFile.toInputFile()) + Parquet.read(output.toInputFile()) .project(expectedSchema) - .createReaderFunc(type -> SparkParquetReaders.buildReader(expectedSchema, type)) + .createReaderFunc( + type -> SparkParquetReaders.buildReader(expectedSchema, type, ID_TO_CONSTANT)) .build()) { Iterator rows = reader.iterator(); - for (GenericData.Record record : expected) { - assertThat(rows.hasNext()).as("Should have expected number of rows").isTrue(); - assertEqualsUnsafe(expectedSchema.asStruct(), record, rows.next()); + int pos = 0; + for (Record record : expected) { + assertThat(rows).as("Should have expected number of rows").hasNext(); + GenericsHelpers.assertEqualsUnsafe( + expectedSchema.asStruct(), record, rows.next(), ID_TO_CONSTANT, pos); + pos += 1; } - assertThat(rows.hasNext()).as("Should not have extra rows").isFalse(); + assertThat(rows).as("Should not have extra rows").isExhausted(); } } + @Override + protected boolean supportsRowLineage() { + return true; + } + @Override protected boolean supportsDefaultValues() { return true;