Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.RandomGenericData;
Expand All @@ -39,27 +43,46 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Function;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.data.AvroDataTestBase;
import org.apache.iceberg.spark.data.GenericsHelpers;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestParquetVectorizedReads extends AvroDataTestBase {
private static final int NUM_ROWS = 200_000;
static final int BATCH_SIZE = 10_000;

private static final String PLAIN = "PLAIN";
private static final List<String> GOLDEN_FILE_ENCODINGS =
ImmutableList.of("PLAIN_DICTIONARY", "RLE", "RLE_DICTIONARY", "DELTA_BINARY_PACKED");
private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
ImmutableMap.of(
"string", Types.StringType.get(),
"float", Types.FloatType.get(),
"int32", Types.IntegerType.get(),
"int64", Types.LongType.get(),
"binary", Types.BinaryType.get(),
"boolean", Types.BooleanType.get());

static final Function<Record, Record> IDENTITY = record -> record;

@Override
Expand Down Expand Up @@ -416,4 +439,62 @@ public void testUuidReads() throws Exception {
}
assertRecordsMatch(schema, numRows, data, dataFile, false, BATCH_SIZE);
}

private void assertIdenticalFileContents(File actual, File expected, Schema schema)
throws IOException {
try (CloseableIterable<InternalRow> actualReader =
Parquet.read(Files.localInput(actual))
.project(schema)
.createReaderFunc(t -> SparkParquetReaders.buildReader(schema, t, ID_TO_CONSTANT))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eric-maynard Looks like we overlooked here: it should build the vectorized reader instead. Could you please open a follow up PR to fix this? Thanks!

.build()) {
Iterator<InternalRow> actualIterator = actualReader.iterator();
try (CloseableIterable<InternalRow> plainReader =
Parquet.read(Files.localInput(expected))
.project(schema)
.createReaderFunc(t -> SparkParquetReaders.buildReader(schema, t, ID_TO_CONSTANT))
.build()) {
Iterator<InternalRow> expectedIterator = plainReader.iterator();

List<InternalRow> expectedList = Lists.newArrayList();
expectedIterator.forEachRemaining(expectedList::add);
List<InternalRow> actualList = Lists.newArrayList();
actualIterator.forEachRemaining(actualList::add);

assertThat(actualList)
.as("Comparison between files failed %s <-> %s", actual, expected)
.isNotEmpty()
.hasSameSizeAs(expectedList)
.hasSameElementsAs(expectedList);
}
}
}

static Stream<Arguments> goldenFilesAndEncodings() {
return GOLDEN_FILE_ENCODINGS.stream()
.flatMap(
encoding ->
GOLDEN_FILE_TYPES.entrySet().stream()
.map(
typeEntry ->
Arguments.of(encoding, typeEntry.getKey(), typeEntry.getValue())));
}

@ParameterizedTest
@MethodSource("goldenFilesAndEncodings")
public void testGoldenFiles(String encoding, String typeName, PrimitiveType primitiveType)
throws Exception {
Path goldenResourcePath = Paths.get("encodings", encoding, typeName + ".parquet");
URL goldenFileUrl = getClass().getClassLoader().getResource(goldenResourcePath.toString());
assumeThat(goldenFileUrl).isNotNull().as("type/encoding pair exists");

Path plainResourcePath = Paths.get("encodings", PLAIN, typeName + ".parquet");
URL plainFileUrl = getClass().getClassLoader().getResource(plainResourcePath.toString());
if (plainFileUrl == null) {
throw new IllegalStateException("PLAIN encoded file should exist: " + plainResourcePath);
}

Schema expectedSchema = new Schema(optional(1, "data", primitiveType));
assertIdenticalFileContents(
new File(goldenFileUrl.toURI()), new File(plainFileUrl.toURI()), expectedSchema);
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.