diff --git a/build.gradle b/build.gradle index 3cf1df1604cf..ba15f59d5fd8 100644 --- a/build.gradle +++ b/build.gradle @@ -869,6 +869,8 @@ project(':iceberg-orc') { } project(':iceberg-parquet') { + apply plugin: 'java-test-fixtures' + test { useJUnitPlatform() } diff --git a/spark/v4.0/spark/src/test/resources/encodings/DELTA_BINARY_PACKED/int32.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int32.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/DELTA_BINARY_PACKED/int32.parquet rename to parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int32.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/DELTA_BINARY_PACKED/int64.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int64.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/DELTA_BINARY_PACKED/int64.parquet rename to parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int64.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN/binary.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/binary.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN/binary.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN/binary.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN/boolean.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/boolean.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN/boolean.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN/boolean.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN/float.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/float.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN/float.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN/float.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN/int32.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/int32.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN/int32.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN/int32.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN/int64.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/int64.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN/int64.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN/int64.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN/string.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/string.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN/string.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN/string.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/binary.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/binary.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/binary.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/binary.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/float.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/float.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/float.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/float.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/int32.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int32.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/int32.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int32.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/int64.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int64.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/int64.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int64.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/string.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/string.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/PLAIN_DICTIONARY/string.parquet rename to parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/string.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/binary.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/binary.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/binary.parquet rename to parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/binary.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/float.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/float.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/float.parquet rename to parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/float.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/int32.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int32.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/int32.parquet rename to parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int32.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/int64.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int64.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/int64.parquet rename to parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int64.parquet diff --git a/spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/string.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/string.parquet similarity index 100% rename from spark/v4.0/spark/src/test/resources/encodings/RLE_DICTIONARY/string.parquet rename to parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/string.parquet diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 69700d84366d..125d9e9c9276 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -112,6 +112,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { } testImplementation libs.sqlite.jdbc testImplementation libs.awaitility + testImplementation(testFixtures(project(':iceberg-parquet'))) // runtime dependencies for running REST Catalog based integration test testRuntimeOnly libs.jetty.servlet } @@ -185,6 +186,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation libs.parquet.hadoop testImplementation libs.awaitility testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" + testImplementation(testFixtures(project(':iceberg-parquet'))) // Required because we remove antlr plugin dependencies from the compile configuration, see note above runtimeOnly libs.antlr.runtime diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 58404a62d882..e664db554bc0 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.data.vectorized.parquet; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -26,22 +27,31 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.Consumer; +import java.util.stream.Stream; import org.apache.arrow.memory.BufferAllocator; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Function; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -49,20 +59,38 @@ import org.apache.iceberg.spark.data.AvroDataTestBase; import org.apache.iceberg.spark.data.GenericsHelpers; import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class TestParquetVectorizedReads extends AvroDataTestBase { private static final int NUM_ROWS = 200_000; static final int BATCH_SIZE = 10_000; + private static final String PLAIN = "PLAIN"; + private static final List GOLDEN_FILE_ENCODINGS = + ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + private static final Map GOLDEN_FILE_TYPES = + ImmutableMap.of( + "string", Types.StringType.get(), + "float", Types.FloatType.get(), + "int32", Types.IntegerType.get(), + "int64", Types.LongType.get(), + "binary", Types.BinaryType.get(), + "boolean", Types.BooleanType.get()); + static final Function IDENTITY = record -> record; @Override @@ -376,12 +404,14 @@ public void testReadsForTypePromotedColumns() throws Exception { public void testSupportedReadsForParquetV2() throws Exception { // Float and double column types are written using plain encoding with Parquet V2, // also Parquet V2 will dictionary encode decimals that use fixed length binary - // (i.e. decimals > 8 bytes) + // (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED. Schema schema = new Schema( optional(102, "float_data", Types.FloatType.get()), optional(103, "double_data", Types.DoubleType.get()), - optional(104, "decimal_data", Types.DecimalType.of(25, 5))); + optional(104, "decimal_data", Types.DecimalType.of(25, 5)), + optional(105, "int_data", Types.IntegerType.get()), + optional(106, "long_data", Types.LongType.get())); File dataFile = File.createTempFile("junit", null, temp.toFile()); assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); @@ -395,8 +425,7 @@ public void testSupportedReadsForParquetV2() throws Exception { @Test public void testUnsupportedReadsForParquetV2() throws Exception { - // Longs, ints, string types etc use delta encoding and which are not supported for vectorized - // reads + // Some types use delta encoding and which are not supported for vectorized reads Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); File dataFile = File.createTempFile("junit", null, temp.toFile()); assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); @@ -439,4 +468,82 @@ protected void assertNoLeak(String testName, Consumer testFunct allocator.close(); } } + + private void assertIdenticalFileContents( + File actual, File expected, Schema schema, boolean vectorized) throws IOException { + try (CloseableIterable expectedIterator = + Parquet.read(Files.localInput(expected)) + .project(schema) + .createReaderFunc(msgType -> GenericParquetReaders.buildReader(schema, msgType)) + .build()) { + List expectedRecords = Lists.newArrayList(expectedIterator); + if (vectorized) { + assertRecordsMatch( + schema, expectedRecords.size(), expectedRecords, actual, false, BATCH_SIZE); + } else { + try (CloseableIterable actualIterator = + Parquet.read(Files.localInput(actual)) + .project(schema) + .createReaderFunc(msgType -> SparkParquetReaders.buildReader(schema, msgType)) + .build()) { + List actualRecords = Lists.newArrayList(actualIterator); + assertThat(actualRecords).hasSameSizeAs(expectedRecords); + for (int i = 0; i < actualRecords.size(); i++) { + GenericsHelpers.assertEqualsUnsafe( + schema.asStruct(), expectedRecords.get(i), actualRecords.get(i)); + } + } + } + } + } + + static Stream goldenFilesAndEncodings() { + return GOLDEN_FILE_ENCODINGS.stream() + .flatMap( + encoding -> + GOLDEN_FILE_TYPES.entrySet().stream() + .flatMap( + e -> + Stream.of(true, false) + .map( + vectorized -> + Arguments.of( + encoding, e.getKey(), e.getValue(), vectorized)))); + } + + private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException { + if ("file".equals(url.getProtocol())) { + return Paths.get(url.toURI()).toFile(); + } + + String name = Paths.get(url.getPath()).getFileName().toString(); // e.g., string.parquet + String suffix = name.contains(".") ? name.substring(name.lastIndexOf('.')) : ""; + File tmp = File.createTempFile("golden-", suffix, temp.toFile()); + try (InputStream in = url.openStream()) { + java.nio.file.Files.copy(in, tmp.toPath(), REPLACE_EXISTING); + } + return tmp; + } + + @ParameterizedTest + @MethodSource("goldenFilesAndEncodings") + public void testGoldenFiles( + String encoding, String typeName, PrimitiveType primitiveType, boolean vectorized) + throws Exception { + Path goldenResourcePath = Paths.get("encodings", encoding, typeName + ".parquet"); + URL goldenFileUrl = getClass().getClassLoader().getResource(goldenResourcePath.toString()); + assumeThat(goldenFileUrl).as("type/encoding pair exists").isNotNull(); + + Path plainResourcePath = Paths.get("encodings", PLAIN, typeName + ".parquet"); + URL plainFileUrl = getClass().getClassLoader().getResource(plainResourcePath.toString()); + Preconditions.checkState( + plainFileUrl != null, "PLAIN encoded file should exist: " + plainResourcePath); + + Schema expectedSchema = new Schema(optional(1, "data", primitiveType)); + assertIdenticalFileContents( + resourceUrlToLocalFile(goldenFileUrl), + resourceUrlToLocalFile(plainFileUrl), + expectedSchema, + vectorized); + } } diff --git a/spark/v4.0/build.gradle b/spark/v4.0/build.gradle index 9c7ea06f9938..825bd17010ec 100644 --- a/spark/v4.0/build.gradle +++ b/spark/v4.0/build.gradle @@ -117,6 +117,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { } testImplementation libs.sqlite.jdbc testImplementation libs.awaitility + testImplementation(testFixtures(project(':iceberg-parquet'))) // runtime dependencies for running REST Catalog based integration test testRuntimeOnly libs.jetty.servlet } @@ -190,6 +191,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation libs.parquet.hadoop testImplementation libs.awaitility testImplementation "org.apache.datafusion:comet-spark-spark3.5_2.13:${libs.versions.comet.get()}" + testImplementation(testFixtures(project(':iceberg-parquet'))) // Required because we remove antlr plugin dependencies from the compile configuration, see note above runtimeOnly libs.antlr.runtime413 diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index a26496d4e24d..2e46088cfe6d 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.data.vectorized.parquet; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -26,6 +27,8 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; @@ -46,6 +49,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Function; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -493,6 +497,20 @@ static Stream goldenFilesAndEncodings() { encoding, e.getKey(), e.getValue(), vectorized)))); } + private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException { + if ("file".equals(url.getProtocol())) { + return Paths.get(url.toURI()).toFile(); + } + + String name = Paths.get(url.getPath()).getFileName().toString(); // e.g., string.parquet + String suffix = name.contains(".") ? name.substring(name.lastIndexOf('.')) : ""; + File tmp = File.createTempFile("golden-", suffix, temp.toFile()); + try (InputStream in = url.openStream()) { + java.nio.file.Files.copy(in, tmp.toPath(), REPLACE_EXISTING); + } + return tmp; + } + @ParameterizedTest @MethodSource("goldenFilesAndEncodings") public void testGoldenFiles( @@ -500,18 +518,17 @@ public void testGoldenFiles( throws Exception { Path goldenResourcePath = Paths.get("encodings", encoding, typeName + ".parquet"); URL goldenFileUrl = getClass().getClassLoader().getResource(goldenResourcePath.toString()); - assumeThat(goldenFileUrl).isNotNull().as("type/encoding pair exists"); + assumeThat(goldenFileUrl).as("type/encoding pair exists").isNotNull(); Path plainResourcePath = Paths.get("encodings", PLAIN, typeName + ".parquet"); URL plainFileUrl = getClass().getClassLoader().getResource(plainResourcePath.toString()); - if (plainFileUrl == null) { - throw new IllegalStateException("PLAIN encoded file should exist: " + plainResourcePath); - } + Preconditions.checkState( + plainFileUrl != null, "PLAIN encoded file should exist: " + plainResourcePath); Schema expectedSchema = new Schema(optional(1, "data", primitiveType)); assertIdenticalFileContents( - new File(goldenFileUrl.toURI()), - new File(plainFileUrl.toURI()), + resourceUrlToLocalFile(goldenFileUrl), + resourceUrlToLocalFile(plainFileUrl), expectedSchema, vectorized); }