diff --git a/spark/src/main/java/com/netflix/iceberg/spark/data/SparkParquetWriters.java b/spark/src/main/java/com/netflix/iceberg/spark/data/SparkParquetWriters.java new file mode 100644 index 000000000000..470beaa30a28 --- /dev/null +++ b/spark/src/main/java/com/netflix/iceberg/spark/data/SparkParquetWriters.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.spark.data; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.netflix.iceberg.Schema; +import com.netflix.iceberg.parquet.ParquetTypeVisitor; +import com.netflix.iceberg.parquet.ParquetValueReaders; +import com.netflix.iceberg.parquet.ParquetValueReaders.ReusableEntry; +import com.netflix.iceberg.parquet.ParquetValueWriter; +import com.netflix.iceberg.parquet.ParquetValueWriters; +import com.netflix.iceberg.parquet.ParquetValueWriters.PrimitiveWriter; +import com.netflix.iceberg.parquet.ParquetValueWriters.RepeatedKeyValueWriter; +import com.netflix.iceberg.parquet.ParquetValueWriters.RepeatedWriter; +import com.netflix.iceberg.types.TypeUtil; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import static com.netflix.iceberg.parquet.ParquetValueWriters.option; +import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; + +public class SparkParquetWriters { + private SparkParquetWriters() { + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter(Schema schema, MessageType type) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(schema, type)); + } + + private static class WriteBuilder extends ParquetTypeVisitor> { + private final Schema schema; + private final MessageType type; + + WriteBuilder(Schema schema, MessageType type) { + this.schema = schema; + this.type = type; + } + + @Override + public ParquetValueWriter message(MessageType message, + List> fieldWriters) { + return struct(message.asGroupType(), fieldWriters); + } + + @Override + public ParquetValueWriter struct(GroupType struct, + List> fieldWriters) { + List fields = struct.getFields(); + List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); + List sparkTypes = Lists.newArrayList(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = struct.getType(i); + int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())); + writers.add(option(fieldType, fieldD, fieldWriters.get(i))); + sparkTypes.add(convert(schema.findType(fieldType.getId().intValue()))); + } + + return new InternalRowWriter(writers, sparkTypes); + } + + @Override + public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + GroupType repeated = array.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath); + int repeatedR = type.getMaxRepetitionLevel(repeatedPath); + + org.apache.parquet.schema.Type elementType = repeated.getType(0); + int elementD = type.getMaxDefinitionLevel(path(elementType.getName())); + + DataType elementSparkType = convert(schema.findType(elementType.getId().intValue())); + + return new ArrayDataWriter<>(repeatedD, repeatedR, + option(elementType, elementD, elementWriter), + elementSparkType); + } + + @Override + public ParquetValueWriter map(GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { + GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath); + int repeatedR = type.getMaxRepetitionLevel(repeatedPath); + + org.apache.parquet.schema.Type keyType = repeatedKeyValue.getType(0); + int keyD = type.getMaxDefinitionLevel(path(keyType.getName())); + DataType keySparkType = convert(schema.findType(keyType.getId().intValue())); + org.apache.parquet.schema.Type valueType = repeatedKeyValue.getType(1); + int valueD = type.getMaxDefinitionLevel(path(valueType.getName())); + DataType valueSparkType = convert(schema.findType(valueType.getId().intValue())); + + return new MapDataWriter<>(repeatedD, repeatedR, + option(keyType, keyD, keyWriter), option(valueType, valueD, valueWriter), + keySparkType, valueSparkType); + } + + @Override + public ParquetValueWriter primitive(PrimitiveType primitive) { + ColumnDescriptor desc = type.getColumnDescription(currentPath()); + + if (primitive.getOriginalType() != null) { + switch (primitive.getOriginalType()) { + case ENUM: + case JSON: + case UTF8: + return utf8Strings(desc); + case DATE: + case INT_8: + case INT_16: + case INT_32: + case INT_64: + case TIME_MICROS: + case TIMESTAMP_MICROS: + return ParquetValueWriters.unboxed(desc); + case DECIMAL: + DecimalMetadata decimal = primitive.getDecimalMetadata(); + switch (primitive.getPrimitiveTypeName()) { + case INT32: + return decimalAsInteger(desc, decimal.getPrecision(), decimal.getScale()); + case INT64: + return decimalAsLong(desc, decimal.getPrecision(), decimal.getScale()); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return decimalAsFixed(desc, decimal.getPrecision(), decimal.getScale()); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + case BSON: + return byteArrays(desc); + default: + throw new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getOriginalType()); + } + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return byteArrays(desc); + case BOOLEAN: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + return ParquetValueWriters.unboxed(desc); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } + + private String[] currentPath() { + String[] path = new String[fieldNames.size()]; + if (!fieldNames.isEmpty()) { + Iterator iter = fieldNames.descendingIterator(); + for (int i = 0; iter.hasNext(); i += 1) { + path[i] = iter.next(); + } + } + + return path; + } + + private String[] path(String name) { + String[] path = new String[fieldNames.size() + 1]; + path[fieldNames.size()] = name; + + if (!fieldNames.isEmpty()) { + Iterator iter = fieldNames.descendingIterator(); + for (int i = 0; iter.hasNext(); i += 1) { + path[i] = iter.next(); + } + } + + return path; + } + } + + private static PrimitiveWriter utf8Strings(ColumnDescriptor desc) { + return new UTF8StringWriter(desc); + } + + private static PrimitiveWriter decimalAsInteger(ColumnDescriptor desc, + int precision, int scale) { + return new IntegerDecimalWriter(desc, precision, scale); + } + + private static PrimitiveWriter decimalAsLong(ColumnDescriptor desc, + int precision, int scale) { + return new LongDecimalWriter(desc, precision, scale); + } + + private static PrimitiveWriter decimalAsFixed(ColumnDescriptor desc, + int precision, int scale) { + return new FixedDecimalWriter(desc, precision, scale); + } + + private static PrimitiveWriter byteArrays(ColumnDescriptor desc) { + return new ByteArrayWriter(desc); + } + + private static class UTF8StringWriter extends PrimitiveWriter { + private UTF8StringWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, UTF8String value) { + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value.getBytes())); + } + } + + private static class IntegerDecimalWriter extends PrimitiveWriter { + private final int precision; + private final int scale; + + private IntegerDecimalWriter(ColumnDescriptor desc, int precision, int scale) { + super(desc); + this.precision = precision; + this.scale = scale; + } + + @Override + public void write(int repetitionLevel, Decimal decimal) { + Preconditions.checkArgument(decimal.scale() == scale, + "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); + Preconditions.checkArgument(decimal.precision() <= precision, + "Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal); + + column.writeInteger(repetitionLevel, (int) decimal.toUnscaledLong()); + } + } + + private static class LongDecimalWriter extends PrimitiveWriter { + private final int precision; + private final int scale; + + private LongDecimalWriter(ColumnDescriptor desc, int precision, int scale) { + super(desc); + this.precision = precision; + this.scale = scale; + } + + @Override + public void write(int repetitionLevel, Decimal decimal) { + Preconditions.checkArgument(decimal.scale() == scale, + "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); + Preconditions.checkArgument(decimal.precision() <= precision, + "Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal); + + column.writeLong(repetitionLevel, decimal.toUnscaledLong()); + } + } + + private static class FixedDecimalWriter extends PrimitiveWriter { + private final int precision; + private final int scale; + private final int length; + private final ThreadLocal bytes; + + private FixedDecimalWriter(ColumnDescriptor desc, int precision, int scale) { + super(desc); + this.precision = precision; + this.scale = scale; + this.length = TypeUtil.decimalRequriedBytes(precision); + this.bytes = ThreadLocal.withInitial(() -> new byte[length]); + } + + @Override + public void write(int repetitionLevel, Decimal decimal) { + Preconditions.checkArgument(decimal.scale() == scale, + "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); + Preconditions.checkArgument(decimal.precision() <= precision, + "Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal); + + BigDecimal bigDecimal = decimal.toJavaBigDecimal(); + + byte fillByte = (byte) (bigDecimal.signum() < 0 ? 0xFF : 0x00); + byte[] unscaled = bigDecimal.unscaledValue().toByteArray(); + byte[] buf = bytes.get(); + int offset = length - unscaled.length; + + for (int i = 0; i < length; i += 1) { + if (i < offset) { + buf[i] = fillByte; + } else { + buf[i] = unscaled[i - offset]; + } + } + + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(buf)); + } + } + + private static class ByteArrayWriter extends PrimitiveWriter { + private ByteArrayWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, byte[] bytes) { + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(bytes)); + } + } + + private static class ArrayDataWriter extends RepeatedWriter { + private final DataType elementType; + + private ArrayDataWriter(int definitionLevel, int repetitionLevel, + ParquetValueWriter writer, DataType elementType) { + super(definitionLevel, repetitionLevel, writer); + this.elementType = elementType; + } + + @Override + protected Iterator elements(ArrayData list) { + return new ElementIterator<>(list); + } + + private class ElementIterator implements Iterator { + private final int size; + private final ArrayData list; + private int index; + + private ElementIterator(ArrayData list) { + this.list = list; + size = list.numElements(); + index = 0; + } + + @Override + public boolean hasNext() { + return index != size; + } + + @Override + @SuppressWarnings("unchecked") + public E next() { + if (index >= size) { + throw new NoSuchElementException(); + } + + E element; + if (list.isNullAt(index)) { + element = null; + } else { + element = (E) list.get(index, elementType); + } + + index += 1; + + return element; + } + } + } + + private static class MapDataWriter extends RepeatedKeyValueWriter { + private final DataType keyType; + private final DataType valueType; + + private MapDataWriter(int definitionLevel, int repetitionLevel, + ParquetValueWriter keyWriter, ParquetValueWriter valueWriter, + DataType keyType, DataType valueType) { + super(definitionLevel, repetitionLevel, keyWriter, valueWriter); + this.keyType = keyType; + this.valueType = valueType; + } + + @Override + protected Iterator> pairs(MapData map) { + return new EntryIterator<>(map); + } + + private class EntryIterator implements Iterator> { + private final int size; + private final ArrayData keys; + private final ArrayData values; + private final ReusableEntry entry; + private final MapData map; + private int index; + + private EntryIterator(MapData map) { + this.map = map; + size = map.numElements(); + keys = map.keyArray(); + values = map.valueArray(); + entry = new ReusableEntry<>(); + index = 0; + } + + @Override + public boolean hasNext() { + return index != size; + } + + @Override + @SuppressWarnings("unchecked") + public Map.Entry next() { + if (index >= size) { + throw new NoSuchElementException(); + } + + if (values.isNullAt(index)) { + entry.set((K) keys.get(index, keyType), null); + } else { + entry.set((K) keys.get(index, keyType), (V) values.get(index, valueType)); + } + + index += 1; + + return entry; + } + } + } + + private static class InternalRowWriter extends ParquetValueWriters.StructWriter { + private final DataType[] types; + + private InternalRowWriter(List> writers, List types) { + super(writers); + this.types = types.toArray(new DataType[types.size()]); + } + + @Override + protected Object get(InternalRow struct, int index) { + return struct.get(index, types[index]); + } + } +} diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index e8c03624e1d9..6941186a05b3 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -42,9 +42,9 @@ import com.netflix.iceberg.io.OutputFile; import com.netflix.iceberg.parquet.Parquet; import com.netflix.iceberg.spark.data.SparkAvroWriter; +import com.netflix.iceberg.spark.data.SparkParquetWriters; import com.netflix.iceberg.util.Tasks; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; @@ -71,7 +71,6 @@ import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; -import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; // TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage class Writer implements DataSourceWriter { @@ -214,14 +213,8 @@ public FileAppender newAppender(OutputFile file, FileFormat format) try { switch (format) { case PARQUET: - String jsonSchema = convert(schema).json(); return Parquet.write(file) - .writeSupport(new ParquetWriteSupport()) - .set("org.apache.spark.sql.parquet.row.attributes", jsonSchema) - .set("spark.sql.parquet.writeLegacyFormat", "false") - .set("spark.sql.parquet.binaryAsString", "false") - .set("spark.sql.parquet.int96AsTimestamp", "false") - .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(schema, msgType)) .setAll(properties) .schema(schema) .build(); diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/RandomData.java b/spark/src/test/java/com/netflix/iceberg/spark/data/RandomData.java index 710c4784c99a..f1616f84b2a2 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/RandomData.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/RandomData.java @@ -59,19 +59,22 @@ public static List generateList(Schema schema, int numRecords, long seed return records; } - public static Iterator generateSpark(Schema schema, int rows, long seed) { - return new Iterator() { - private int rowsLeft = rows; - private final SparkRandomDataGenerator generator = new SparkRandomDataGenerator(seed); + public static Iterable generateSpark(Schema schema, int numRecords, long seed) { + return () -> new Iterator() { + private SparkRandomDataGenerator generator = new SparkRandomDataGenerator(seed); + private int count = 0; @Override public boolean hasNext() { - return rowsLeft > 0; + return count < numRecords; } @Override public InternalRow next() { - rowsLeft -= 1; + if (count >= numRecords) { + throw new NoSuchElementException(); + } + count += 1; return (InternalRow) TypeUtil.visit(schema, generator); } }; diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/com/netflix/iceberg/spark/data/TestHelpers.java index a9442b71d071..33480020df15 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/TestHelpers.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/TestHelpers.java @@ -20,6 +20,7 @@ package com.netflix.iceberg.spark.data; import com.google.common.collect.Lists; +import com.netflix.iceberg.Schema; import com.netflix.iceberg.types.Type; import com.netflix.iceberg.types.Types; import org.apache.avro.generic.GenericData; @@ -32,7 +33,12 @@ import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; import scala.collection.Seq; @@ -557,4 +563,78 @@ private static void assertEqualBytes(String context, byte[] expected, Assert.assertArrayEquals(context, expected, actual); } } + + static void assertEquals(Schema schema, Object expected, Object actual) { + assertEquals("schema", convert(schema), expected, actual); + } + + private static void assertEquals(String context, DataType type, Object expected, Object actual) { + if (expected == null && actual == null) { + return; + } + + if (type instanceof StructType) { + Assert.assertTrue("Expected should be an InternalRow: " + context, + expected instanceof InternalRow); + Assert.assertTrue("Actual should be an InternalRow: " + context, + actual instanceof InternalRow); + assertEquals(context, (StructType) type, (InternalRow) expected, (InternalRow) actual); + + } else if (type instanceof ArrayType) { + Assert.assertTrue("Expected should be an ArrayData: " + context, + expected instanceof ArrayData); + Assert.assertTrue("Actual should be an ArrayData: " + context, + actual instanceof ArrayData); + assertEquals(context, (ArrayType) type, (ArrayData) expected, (ArrayData) actual); + + } else if (type instanceof MapType) { + Assert.assertTrue("Expected should be a MapData: " + context, + expected instanceof MapData); + Assert.assertTrue("Actual should be a MapData: " + context, + actual instanceof MapData); + assertEquals(context, (MapType) type, (MapData) expected, (MapData) actual); + + } else { + Assert.assertEquals("Value should match expected: " + context, expected, actual); + } + } + + private static void assertEquals(String context, StructType struct, + InternalRow expected, InternalRow actual) { + Assert.assertEquals("Should have correct number of fields", struct.size(), actual.numFields()); + for (int i = 0; i < actual.numFields(); i += 1) { + StructField field = struct.fields()[i]; + DataType type = field.dataType(); + assertEquals(context + "." + field.name(), type, expected.get(i, type), actual.get(i, type)); + } + } + + private static void assertEquals(String context, ArrayType array, ArrayData expected, ArrayData actual) { + Assert.assertEquals("Should have the same number of elements", + expected.numElements(), actual.numElements()); + DataType type = array.elementType(); + for (int i = 0; i < actual.numElements(); i += 1) { + assertEquals(context + ".element", type, expected.get(i, type), actual.get(i, type)); + } + } + + private static void assertEquals(String context, MapType map, MapData expected, MapData actual) { + Assert.assertEquals("Should have the same number of elements", + expected.numElements(), actual.numElements()); + + DataType keyType = map.keyType(); + ArrayData expectedKeys = expected.keyArray(); + ArrayData expectedValues = expected.valueArray(); + + DataType valueType = map.valueType(); + ArrayData actualKeys = actual.keyArray(); + ArrayData actualValues = actual.valueArray(); + + for (int i = 0; i < actual.numElements(); i += 1) { + assertEquals(context + ".key", keyType, + expectedKeys.get(i, keyType), actualKeys.get(i, keyType)); + assertEquals(context + ".value", valueType, + expectedValues.get(i, valueType), actualValues.get(i, valueType)); + } + } } diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkParquetWriter.java b/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkParquetWriter.java new file mode 100644 index 000000000000..61cec5ad4dba --- /dev/null +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/TestSparkParquetWriter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.spark.data; + +import com.netflix.iceberg.Files; +import com.netflix.iceberg.Schema; +import com.netflix.iceberg.io.CloseableIterable; +import com.netflix.iceberg.io.FileAppender; +import com.netflix.iceberg.parquet.Parquet; +import com.netflix.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import static com.netflix.iceberg.types.Types.NestedField.optional; +import static com.netflix.iceberg.types.Types.NestedField.required; + +public class TestSparkParquetWriter { + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static final Schema COMPLEX_SCHEMA = new Schema( + required(1, "roots", Types.LongType.get()), + optional(3, "lime", Types.ListType.ofRequired(4, Types.DoubleType.get())), + required(5, "strict", Types.StructType.of( + required(9, "tangerine", Types.StringType.get()), + optional(6, "hopeful", Types.StructType.of( + required(7, "steel", Types.FloatType.get()), + required(8, "lantern", Types.DateType.get()) + )), + optional(10, "vehement", Types.LongType.get()) + )), + optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13, + Types.StringType.get(), Types.TimestampType.withZone())), + required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of( + optional(16, "beet", Types.DoubleType.get()), + required(17, "stamp", Types.FloatType.get()), + optional(18, "wheeze", Types.StringType.get()) + ))), + optional(19, "renovate", Types.MapType.ofRequired(20, 21, + Types.StringType.get(), Types.StructType.of( + optional(22, "jumpy", Types.DoubleType.get()), + required(23, "koala", Types.IntegerType.get()) + ))), + optional(2, "slide", Types.StringType.get()) + ); + + @Test + public void testCorrectness() throws IOException { + int numRows = 250_000; + Iterable records = RandomData.generateSpark(COMPLEX_SCHEMA, numRows, 19981); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) + .schema(COMPLEX_SCHEMA) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(COMPLEX_SCHEMA, msgType)) + .build()) { + writer.addAll(records); + } + + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(COMPLEX_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(COMPLEX_SCHEMA, type)) + .build()) { + Iterator expected = records.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < numRows; i += 1) { + Assert.assertTrue("Should have expected number of rows", rows.hasNext()); + TestHelpers.assertEquals(COMPLEX_SCHEMA, expected.next(), rows.next()); + } + Assert.assertFalse("Should not have extra rows", rows.hasNext()); + } + } +}