diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 35b8981ca0fe..0687ecf97a89 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -81,13 +81,14 @@ public Schema record(Schema record, List names, Iterable s List expectedFields = struct.fields(); for (int i = 0; i < expectedFields.size(); i += 1) { Types.NestedField field = expectedFields.get(i); + String sanitizedFieldName = AvroSchemaUtil.makeCompatibleName(field.name()); // detect reordering - if (i < fields.size() && !field.name().equals(fields.get(i).name())) { + if (i < fields.size() && !sanitizedFieldName.equals(fields.get(i).name())) { hasChange = true; } - Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name())); + Schema.Field avroField = updateMap.get(sanitizedFieldName); if (avroField != null) { updatedFields.add(avroField); @@ -123,7 +124,7 @@ public Schema.Field field(Schema.Field field, Supplier fieldResult) { return null; } - String expectedName = expectedField.name(); + String expectedName = AvroSchemaUtil.makeCompatibleName(expectedField.name()); this.current = expectedField.type(); try { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 9a36266ffdf2..ea8f725f3afc 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -26,6 +26,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -66,23 +67,29 @@ public class SparkParquetReaders { private SparkParquetReaders() { } - @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { + return SparkParquetReaders.buildReader(expectedSchema, fileSchema, Collections.emptyMap()); + } + + @SuppressWarnings("unchecked") + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map partitionValues) { if (ParquetSchemaUtil.hasIds(fileSchema)) { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema)); + new ReadBuilder(fileSchema, partitionValues)); } else { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema)); + new FallbackReadBuilder(fileSchema, partitionValues)); } } private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type) { - super(type); + FallbackReadBuilder(MessageType type, Map partitionValues) { + super(type, partitionValues); } @Override @@ -113,9 +120,11 @@ public ParquetValueReader struct(Types.StructType ignored, GroupType struct, private static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; + private final Map partitionValues; - ReadBuilder(MessageType type) { + ReadBuilder(MessageType type, Map partitionValues) { this.type = type; + this.partitionValues = partitionValues; } @Override @@ -146,13 +155,18 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); + if (partitionValues.containsKey(id)) { + reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id))); types.add(null); + } else { + ParquetValueReader reader = readersById.get(id); + if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 43c966af0a1e..bb71d8069592 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; @@ -31,8 +32,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.function.Function; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -61,14 +60,11 @@ import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -84,7 +80,6 @@ import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.StringType; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; @@ -396,48 +391,25 @@ private Iterator open(FileScanTask task) { // schema or rows returned by readers Schema finalSchema = expectedSchema; PartitionSpec spec = task.spec(); - Set idColumns = spec.identitySourceIds(); // schema needed for the projection and filtering StructType sparkType = SparkSchemaUtil.convert(finalSchema); Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); - boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); - Schema iterSchema; - Iterator iter; - - if (hasJoinedPartitionColumns) { - // schema used to read data files - Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); - Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); - PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); - JoinedRow joined = new JoinedRow(); - - InternalRow partition = convertToRow.apply(file.partition()); - joined.withRight(partition); - - // create joined rows and project from the joined schema to the final schema - iterSchema = TypeUtil.join(readSchema, partitionSchema); - iter = Iterators.transform(open(task, readSchema), joined::withLeft); - - } else if (hasExtraFilterColumns) { - // add projection to the final schema - iterSchema = requiredSchema; - iter = open(task, requiredSchema); + // build a map of partition values for reconstructing records + Map partitionValues = partitionMap(spec, file.partition()); + if (hasExtraFilterColumns) { + return Iterators.transform( + open(task, requiredSchema, partitionValues), + APPLY_PROJECTION.bind(projection(finalSchema, requiredSchema))::invoke); } else { - // return the base iterator - iterSchema = finalSchema; - iter = open(task, finalSchema); + return open(task, finalSchema, partitionValues); } - - // TODO: remove the projection by reporting the iterator's schema back to Spark - return Iterators.transform(iter, - APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); } - private Iterator open(FileScanTask task, Schema readSchema) { + private Iterator open(FileScanTask task, Schema readSchema, Map partitionValues) { CloseableIterable iter; if (task.isDataTask()) { iter = newDataIterable(task.asDataTask(), readSchema); @@ -448,7 +420,7 @@ private Iterator open(FileScanTask task, Schema readSchema) { switch (task.file().format()) { case PARQUET: - iter = newParquetIterable(location, task, readSchema); + iter = newParquetIterable(location, task, readSchema, partitionValues); break; case AVRO: @@ -504,12 +476,14 @@ private CloseableIterable newAvroIterable(InputFile location, } private CloseableIterable newParquetIterable(InputFile location, - FileScanTask task, - Schema readSchema) { + FileScanTask task, + Schema readSchema, + Map partitionValues) { + return Parquet.read(location) .project(readSchema) .split(task.start(), task.length()) - .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema)) + .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, partitionValues)) .filter(task.residual()) .caseSensitive(caseSensitive) .build(); @@ -533,52 +507,6 @@ private CloseableIterable newDataIterable(DataTask task, Schema rea return CloseableIterable.transform( asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke); } - } - - private static class PartitionRowConverter implements Function { - private final DataType[] types; - private final int[] positions; - private final Class[] javaTypes; - private final GenericInternalRow reusedRow; - - PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) { - StructType partitionType = SparkSchemaUtil.convert(partitionSchema); - StructField[] fields = partitionType.fields(); - - this.types = new DataType[fields.length]; - this.positions = new int[types.length]; - this.javaTypes = new Class[types.length]; - this.reusedRow = new GenericInternalRow(types.length); - - List partitionFields = spec.fields(); - for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) { - this.types[rowIndex] = fields[rowIndex].dataType(); - - int sourceId = partitionSchema.columns().get(rowIndex).fieldId(); - for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) { - PartitionField field = spec.fields().get(specIndex); - if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) { - positions[rowIndex] = specIndex; - javaTypes[rowIndex] = spec.javaClasses()[specIndex]; - break; - } - } - } - } - - @Override - public InternalRow apply(StructLike tuple) { - for (int i = 0; i < types.length; i += 1) { - Object value = tuple.get(positions[i], javaTypes[i]); - if (value != null) { - reusedRow.update(i, convert(value, types[i])); - } else { - reusedRow.setNullAt(i); - } - } - - return reusedRow; - } /** * Converts the objects into instances used by Spark's InternalRow. @@ -588,48 +516,39 @@ public InternalRow apply(StructLike tuple) { * @return the value converted to the representation expected by Spark's InternalRow. */ private static Object convert(Object value, DataType type) { - if (type instanceof StringType) { - return UTF8String.fromString(value.toString()); - } else if (type instanceof BinaryType) { - return ByteBuffers.toByteArray((ByteBuffer) value); - } else if (type instanceof DecimalType) { - return Decimal.fromDecimal(value); + if (value != null) { + if (type instanceof StringType) { + return UTF8String.fromString(value.toString()); + } else if (type instanceof BinaryType) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } else if (type instanceof DecimalType) { + return Decimal.fromDecimal(value); + } } return value; } - } - private static class StructLikeInternalRow implements StructLike { - private final DataType[] types; - private InternalRow row = null; - - StructLikeInternalRow(StructType struct) { - this.types = new DataType[struct.size()]; - StructField[] fields = struct.fields(); - for (int i = 0; i < fields.length; i += 1) { - types[i] = fields[i].dataType(); + /** + * Creates a map from field ID to Spark value for a partition tuple. + * + * @param spec a partition spec + * @param partition a partition tuple + * @return a map from field ID to Spark value + */ + private static Map partitionMap(PartitionSpec spec, StructLike partition) { + Map partitionValues = Maps.newHashMap(); + + List fields = spec.fields(); + for (int i = 0; i < fields.size(); i += 1) { + PartitionField field = fields.get(i); + if ("identity".equals(field.transform().toString())) { + partitionValues.put(field.sourceId(), convert( + partition.get(i, spec.javaClasses()[i]), + SparkSchemaUtil.convert(spec.partitionType().field(field.name()).type()))); + } } - } - public StructLikeInternalRow setRow(InternalRow row) { - this.row = row; - return this; - } - - @Override - public int size() { - return types.length; - } - - @Override - @SuppressWarnings("unchecked") - public T get(int pos, Class javaClass) { - return javaClass.cast(row.get(pos, types[pos])); - } - - @Override - public void set(int pos, T value) { - throw new UnsupportedOperationException("Not implemented: set"); + return partitionValues; } } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroWrite.java new file mode 100644 index 000000000000..1a79a8e43d26 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroWrite.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestAvroWrite { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static SparkSession spark = null; + + @BeforeClass + public static void startSpark() { + TestAvroWrite.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestAvroWrite.spark; + TestAvroWrite.spark = null; + currentSpark.stop(); + } + + @Test + public void testNestedPartitioning() throws IOException { + Schema nestedSchema = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "nestedData", Types.StructType.of( + optional(4, "id", Types.IntegerType.get()), + optional(5, "moreData", Types.StringType.get()))), + optional(6, "timestamp", Types.TimestampType.withZone()) + ); + + File parent = temp.newFolder("parquet"); + File location = new File(parent, "test/iceberg"); + + HadoopTables tables = new HadoopTables(new Configuration()); + PartitionSpec spec = PartitionSpec.builderFor(nestedSchema) + .identity("id") + .day("timestamp") + .identity("nestedData.moreData") + .build(); + Table table = tables.create(nestedSchema, spec, ImmutableMap.of("write.format.default", "avro"), + location.toString()); + + List jsons = Lists.newArrayList( + "{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-01T10:12:55.034Z\" }", + "{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-02T10:12:55.034Z\" }", + "{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-03T10:12:55.034Z\" }", + "{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-04T10:12:55.034Z\" }" + ); + Dataset df = spark.read().schema(SparkSchemaUtil.convert(nestedSchema)) + .json(spark.createDataset(jsons, Encoders.STRING())); + + // TODO: incoming columns must be ordered according to the table's schema + df.select("id", "data", "nestedData", "timestamp").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").collectAsList(); + Assert.assertEquals("Number of rows should match", jsons.size(), actual.size()); + Assert.assertEquals("Row 1 col 1 is 1", 1, actual.get(0).getInt(0)); + Assert.assertEquals("Row 1 col 2 is a", "a", actual.get(0).getString(1)); + Assert.assertEquals("Row 1 col 3,1 is 100", 100, actual.get(0).getStruct(2).getInt(0)); + Assert.assertEquals("Row 1 col 3,2 is p1", "p1", actual.get(0).getStruct(2).getString(1)); + Assert.assertEquals("Row 1 col 4 is 2017-12-01T10:12:55.034+00:00", + 0, actual.get(0).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-01T10:12:55.034Z")))); + Assert.assertEquals("Row 2 col 1 is 2", 2, actual.get(1).getInt(0)); + Assert.assertEquals("Row 2 col 2 is b", "b", actual.get(1).getString(1)); + Assert.assertEquals("Row 2 col 3,1 is 200", 200, actual.get(1).getStruct(2).getInt(0)); + Assert.assertEquals("Row 2 col 3,2 is p1", "p1", actual.get(1).getStruct(2).getString(1)); + Assert.assertEquals("Row 2 col 4 is 2017-12-02 12:12:55.034", + 0, actual.get(1).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-02T10:12:55.034Z")))); + Assert.assertEquals("Row 3 col 1 is 3", 3, actual.get(2).getInt(0)); + Assert.assertEquals("Row 3 col 2 is c", "c", actual.get(2).getString(1)); + Assert.assertEquals("Row 3 col 3,1 is 300", 300, actual.get(2).getStruct(2).getInt(0)); + Assert.assertEquals("Row 3 col 3,2 is p2", "p2", actual.get(2).getStruct(2).getString(1)); + Assert.assertEquals("Row 3 col 4 is 2017-12-03 12:12:55.034", + 0, actual.get(2).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-03T10:12:55.034Z")))); + Assert.assertEquals("Row 4 col 1 is 4", 4, actual.get(3).getInt(0)); + Assert.assertEquals("Row 4 col 2 is d", "d", actual.get(3).getString(1)); + Assert.assertEquals("Row 4 col 3,1 is 400", 400, actual.get(3).getStruct(2).getInt(0)); + Assert.assertEquals("Row 4 col 3,2 is p2", "p2", actual.get(3).getStruct(2).getString(1)); + Assert.assertEquals("Row 4 col 4 is 2017-12-04 12:12:55.034", + 0, actual.get(3).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-04T10:12:55.034Z")))); + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java index 5357187393aa..305b4628dc0b 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java @@ -22,6 +22,8 @@ import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; +import java.sql.Timestamp; +import java.time.Instant; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; @@ -32,6 +34,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -409,4 +412,79 @@ public void testWriteProjectionWithMiddle() throws IOException { Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); Assert.assertEquals("Result rows should match", expected, actual); } + + @Test + public void testNestedPartitioning() throws IOException { + Schema nestedSchema = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "nestedData", Types.StructType.of( + optional(4, "id", Types.IntegerType.get()), + optional(5, "moreData", Types.StringType.get()))), + optional(6, "timestamp", Types.TimestampType.withZone()) + ); + + File parent = temp.newFolder("parquet"); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(new Configuration()); + PartitionSpec spec = PartitionSpec.builderFor(nestedSchema) + .identity("id") + .day("timestamp") + .identity("nestedData.moreData") + .build(); + Table table = tables.create(nestedSchema, spec, location.toString()); + + List jsons = Lists.newArrayList( + "{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-01T10:12:55.034Z\" }", + "{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-02T10:12:55.034Z\" }", + "{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-03T10:12:55.034Z\" }", + "{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-04T10:12:55.034Z\" }" + ); + Dataset df = spark.read().schema(SparkSchemaUtil.convert(nestedSchema)) + .json(spark.createDataset(jsons, Encoders.STRING())); + + // TODO: incoming columns must be ordered according to the table's schema + df.select("id", "data", "nestedData", "timestamp").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").collectAsList(); + Assert.assertEquals("Number of rows should match", jsons.size(), actual.size()); + Assert.assertEquals("Row 1 col 1 is 1", 1, actual.get(0).getInt(0)); + Assert.assertEquals("Row 1 col 2 is a", "a", actual.get(0).getString(1)); + Assert.assertEquals("Row 1 col 3,1 is 100", 100, actual.get(0).getStruct(2).getInt(0)); + Assert.assertEquals("Row 1 col 3,2 is p1", "p1", actual.get(0).getStruct(2).getString(1)); + Assert.assertEquals("Row 1 col 4 is 2017-12-01T10:12:55.034+00:00", + 0, actual.get(0).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-01T10:12:55.034Z")))); + Assert.assertEquals("Row 2 col 1 is 2", 2, actual.get(1).getInt(0)); + Assert.assertEquals("Row 2 col 2 is b", "b", actual.get(1).getString(1)); + Assert.assertEquals("Row 2 col 3,1 is 200", 200, actual.get(1).getStruct(2).getInt(0)); + Assert.assertEquals("Row 2 col 3,2 is p1", "p1", actual.get(1).getStruct(2).getString(1)); + Assert.assertEquals("Row 2 col 4 is 2017-12-02 12:12:55.034", + 0, actual.get(1).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-02T10:12:55.034Z")))); + Assert.assertEquals("Row 3 col 1 is 3", 3, actual.get(2).getInt(0)); + Assert.assertEquals("Row 3 col 2 is c", "c", actual.get(2).getString(1)); + Assert.assertEquals("Row 3 col 3,1 is 300", 300, actual.get(2).getStruct(2).getInt(0)); + Assert.assertEquals("Row 3 col 3,2 is p2", "p2", actual.get(2).getStruct(2).getString(1)); + Assert.assertEquals("Row 3 col 4 is 2017-12-03 12:12:55.034", + 0, actual.get(2).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-03T10:12:55.034Z")))); + Assert.assertEquals("Row 4 col 1 is 4", 4, actual.get(3).getInt(0)); + Assert.assertEquals("Row 4 col 2 is d", "d", actual.get(3).getString(1)); + Assert.assertEquals("Row 4 col 3,1 is 400", 400, actual.get(3).getStruct(2).getInt(0)); + Assert.assertEquals("Row 4 col 3,2 is p2", "p2", actual.get(3).getStruct(2).getString(1)); + Assert.assertEquals("Row 4 col 4 is 2017-12-04 12:12:55.034", + 0, actual.get(3).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-04T10:12:55.034Z")))); + } }