diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index cbbf77c1e146..7d63509ce41f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -40,7 +40,6 @@ import org.apache.avro.io.ResolvingDecoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import static java.util.Collections.emptyIterator; @@ -580,10 +579,9 @@ protected StructReader(List> readers, Types.StructType struct, Ma List constantList = Lists.newArrayListWithCapacity(fields.size()); for (int pos = 0; pos < fields.size(); pos += 1) { Types.NestedField field = fields.get(pos); - Object constant = idToConstant.get(field.fieldId()); - if (constant != null) { + if (idToConstant.containsKey(field.fieldId())) { positionList.add(pos); - constantList.add(prepareConstant(field.type(), constant)); + constantList.add(idToConstant.get(field.fieldId())); } } @@ -597,10 +595,6 @@ protected StructReader(List> readers, Types.StructType struct, Ma protected abstract void set(S struct, int pos, Object value); - protected Object prepareConstant(Type type, Object value) { - return value; - } - public ValueReader reader(int pos) { return readers[pos]; } diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index 9a2aa992b04f..1ef67db2aa4f 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -22,26 +22,36 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; public class PartitionUtil { private PartitionUtil() { } public static Map constantsMap(FileScanTask task) { - return constantsMap(task.spec(), task.file().partition()); + return constantsMap(task, (type, constant) -> constant); } - private static Map constantsMap(PartitionSpec spec, StructLike partitionData) { + public static Map constantsMap(FileScanTask task, BiFunction convertConstant) { + return constantsMap(task.spec(), task.file().partition(), convertConstant); + } + + private static Map constantsMap(PartitionSpec spec, StructLike partitionData, + BiFunction convertConstant) { // use java.util.HashMap because partition data may contain null values Map idToConstant = new HashMap<>(); + List partitionFields = spec.partitionType().fields(); List fields = spec.fields(); for (int pos = 0; pos < fields.size(); pos += 1) { PartitionField field = fields.get(pos); - idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class)); + Object converted = convertConstant.apply(partitionFields.get(pos).type(), partitionData.get(pos, Object.class)); + idToConstant.put(field.sourceId(), converted); } return idToConstant; } diff --git a/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java b/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java new file mode 100644 index 000000000000..d6ab178a9523 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; + +public class DateTimeUtil { + private DateTimeUtil() { + } + + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + public static LocalDate dateFromDays(int daysFromEpoch) { + return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch); + } + + public static LocalTime timeFromMicros(long microFromMidnight) { + return LocalTime.ofNanoOfDay(microFromMidnight * 1000); + } + + public static LocalDateTime timestampFromMicros(long microsFromEpoch) { + return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime(); + } + + public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) { + return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java index baa2320244ce..a683c019844f 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; +import org.apache.avro.generic.GenericData; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HasTableOperations; @@ -46,6 +47,8 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PartitionUtil; class TableScanIterable extends CloseableGroup implements CloseableIterable { @@ -76,7 +79,7 @@ public Iterator iterator() { private CloseableIterable open(FileScanTask task) { InputFile input = ops.io().newInputFile(task.file().path().toString()); - Map partition = PartitionUtil.constantsMap(task); + Map partition = PartitionUtil.constantsMap(task, TableScanIterable::convertConstant); // TODO: join to partition data from the manifest file switch (task.file().format()) { @@ -96,7 +99,7 @@ private CloseableIterable open(FileScanTask task) { case PARQUET: Parquet.ReadBuilder parquet = Parquet.read(input) .project(projection) - .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema)) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema, partition)) .split(task.start(), task.length()); if (reuseContainers) { @@ -185,4 +188,35 @@ public void close() throws IOException { } } } + + /** + * Conversions from generic Avro values to Iceberg generic values. + */ + private static Object convertConstant(Type type, Object value) { + if (value == null) { + return null; + } + + switch (type.typeId()) { + case STRING: + return value.toString(); + case TIME: + return DateTimeUtil.timeFromMicros((Long) value); + case DATE: + return DateTimeUtil.dateFromDays((Integer) value); + case TIMESTAMP: + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return DateTimeUtil.timestamptzFromMicros((Long) value); + } else { + return DateTimeUtil.timestampFromMicros((Long) value); + } + case FIXED: + if (value instanceof GenericData.Fixed) { + return ((GenericData.Fixed) value).bytes(); + } + return value; + default: + } + return value; + } } diff --git a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 7502d151121c..df866726b90d 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -20,18 +20,16 @@ package org.apache.iceberg.data.avro; import java.io.IOException; -import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import org.apache.avro.io.Decoder; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.data.DateTimeUtil; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types.StructType; @@ -60,9 +58,6 @@ static ValueReader struct(StructType struct, List> reader return new GenericRecordReader(readers, struct, idToConstant); } - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - private static class DateReader implements ValueReader { private static final DateReader INSTANCE = new DateReader(); @@ -71,7 +66,7 @@ private DateReader() { @Override public LocalDate read(Decoder decoder, Object reuse) throws IOException { - return EPOCH_DAY.plusDays(decoder.readInt()); + return DateTimeUtil.dateFromDays(decoder.readInt()); } } @@ -83,7 +78,7 @@ private TimeReader() { @Override public LocalTime read(Decoder decoder, Object reuse) throws IOException { - return LocalTime.ofNanoOfDay(decoder.readLong() * 1000); + return DateTimeUtil.timeFromMicros(decoder.readLong()); } } @@ -95,7 +90,7 @@ private TimestampReader() { @Override public LocalDateTime read(Decoder decoder, Object reuse) throws IOException { - return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS).toLocalDateTime(); + return DateTimeUtil.timestampFromMicros(decoder.readLong()); } } @@ -107,7 +102,7 @@ private TimestamptzReader() { @Override public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { - return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS); + return DateTimeUtil.timestamptzFromMicros(decoder.readLong()); } } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 02b3bccd9d05..bc6767f7470f 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -20,6 +20,7 @@ package org.apache.iceberg.data.parquet; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.time.Instant; @@ -65,23 +66,28 @@ public class GenericParquetReaders { private GenericParquetReaders() { } - @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { + return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); + } + @SuppressWarnings("unchecked") + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant) { if (ParquetSchemaUtil.hasIds(fileSchema)) { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema)); + new ReadBuilder(fileSchema, idToConstant)); } else { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema)); + new FallbackReadBuilder(fileSchema, idToConstant)); } } private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type) { - super(type); + FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); } @Override @@ -112,9 +118,11 @@ public ParquetValueReader struct(StructType expected, GroupType struct, private static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; + private final Map idToConstant; - ReadBuilder(MessageType type) { + ReadBuilder(MessageType type, Map idToConstant) { this.type = type; + this.idToConstant = idToConstant; } @Override @@ -145,13 +153,19 @@ public ParquetValueReader struct(StructType expected, GroupType struct, List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); + if (idToConstant.containsKey(id)) { + // containsKey is used because the constant may be null + reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); types.add(null); + } else { + ParquetValueReader reader = readersById.get(id); + if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 9a36266ffdf2..190b708db14d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.math.BigDecimal; @@ -66,23 +67,29 @@ public class SparkParquetReaders { private SparkParquetReaders() { } - @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { + return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); + } + + @SuppressWarnings("unchecked") + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant) { if (ParquetSchemaUtil.hasIds(fileSchema)) { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema)); + new ReadBuilder(fileSchema, idToConstant)); } else { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema)); + new FallbackReadBuilder(fileSchema, idToConstant)); } } private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type) { - super(type); + FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); } @Override @@ -113,9 +120,11 @@ public ParquetValueReader struct(Types.StructType ignored, GroupType struct, private static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; + private final Map idToConstant; - ReadBuilder(MessageType type) { + ReadBuilder(MessageType type, Map idToConstant) { this.type = type; + this.idToConstant = idToConstant; } @Override @@ -146,13 +155,19 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); + if (idToConstant.containsKey(id)) { + // containsKey is used because the constant may be null + reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); types.add(null); + } else { + ParquetValueReader reader = readersById.get(id); + if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index b799fe8c0cf5..7408ca20be58 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -29,14 +29,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.avro.generic.GenericData; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ByteBuffers; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; @@ -287,30 +284,5 @@ protected void set(InternalRow struct, int pos, Object value) { struct.setNullAt(pos); } } - - @Override - protected Object prepareConstant(Type type, Object value) { - switch (type.typeId()) { - case DECIMAL: - return Decimal.apply((BigDecimal) value); - case STRING: - if (value instanceof Utf8) { - Utf8 utf8 = (Utf8) value; - return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength()); - } - return UTF8String.fromString(value.toString()); - case FIXED: - if (value instanceof byte[]) { - return value; - } else if (value instanceof GenericData.Fixed) { - return ((GenericData.Fixed) value).bytes(); - } - return ByteBuffers.toByteArray((ByteBuffer) value); - case BINARY: - return ByteBuffers.toByteArray((ByteBuffer) value); - default: - } - return value; - } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 61c6fa227485..ec9aa706ef09 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -24,10 +24,14 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -47,8 +51,10 @@ import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; @@ -56,11 +62,13 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; import scala.collection.JavaConverters; class RowDataReader extends BaseDataReader { - private static final Set SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO); + private static final Set SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO, FileFormat.PARQUET); // for some reason, the apply method can't be called from Java without reflection private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") .impl(UnsafeProjection.class, InternalRow.class) @@ -103,7 +111,7 @@ Iterator open(FileScanTask task) { if (hasJoinedPartitionColumns) { if (SUPPORTS_CONSTANTS.contains(file.format())) { iterSchema = requiredSchema; - iter = open(task, requiredSchema, PartitionUtil.constantsMap(task)); + iter = open(task, requiredSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant)); } else { // schema used to read data files Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); @@ -144,7 +152,7 @@ private Iterator open(FileScanTask task, Schema readSchema, Map newAvroIterable( private CloseableIterable newParquetIterable( InputFile location, FileScanTask task, - Schema readSchema) { + Schema readSchema, + Map idToConstant) { return Parquet.read(location) .project(readSchema) .split(task.start(), task.length()) - .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema)) + .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) .filter(task.residual()) .caseSensitive(caseSensitive) .build(); @@ -233,4 +242,32 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(), JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); } + + private static Object convertConstant(Type type, Object value) { + if (value == null) { + return null; + } + + switch (type.typeId()) { + case DECIMAL: + return Decimal.apply((BigDecimal) value); + case STRING: + if (value instanceof Utf8) { + Utf8 utf8 = (Utf8) value; + return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength()); + } + return UTF8String.fromString(value.toString()); + case FIXED: + if (value instanceof byte[]) { + return value; + } else if (value instanceof GenericData.Fixed) { + return ((GenericData.Fixed) value).bytes(); + } + return ByteBuffers.toByteArray((ByteBuffer) value); + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + default: + } + return value; + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java index 059d744ebffd..e6c7621de2b0 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java @@ -41,6 +41,7 @@ import org.apache.spark.sql.SparkSession; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -307,4 +308,72 @@ public void testPartitionValueTypes() throws Exception { TestTables.clearTables(); } } + + @Test + public void testNestedPartitionValues() throws Exception { + Assume.assumeTrue("ORC can't project nested partition values", !format.equalsIgnoreCase("orc")); + + String[] columnNames = new String[] { + "b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10" + }; + + HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf()); + Schema nestedSchema = new Schema(optional(1, "nested", SUPPORTED_PRIMITIVES.asStruct())); + + // create a table around the source data + String sourceLocation = temp.newFolder("source_table").toString(); + Table source = tables.create(nestedSchema, sourceLocation); + + // write out an Avro data file with all of the data types for source data + List expected = RandomData.generateList(source.schema(), 2, 128735L); + File avroData = temp.newFile("data.avro"); + Assert.assertTrue(avroData.delete()); + try (FileAppender appender = Avro.write(Files.localOutput(avroData)) + .schema(source.schema()) + .build()) { + appender.addAll(expected); + } + + // add the Avro data file to the source table + source.newAppend() + .appendFile(DataFiles.fromInputFile(Files.localInput(avroData), 10)) + .commit(); + + Dataset sourceDF = spark.read().format("iceberg").load(sourceLocation); + + try { + for (String column : columnNames) { + String desc = "partition_by_" + SUPPORTED_PRIMITIVES.findType(column).toString(); + + File parent = temp.newFolder(desc); + File location = new File(parent, "test"); + File dataFolder = new File(location, "data"); + Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs()); + + PartitionSpec spec = PartitionSpec.builderFor(nestedSchema).identity("nested." + column).build(); + + Table table = tables.create(nestedSchema, spec, location.toString()); + table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + + sourceDF.write() + .format("iceberg") + .mode("append") + .save(location.toString()); + + List actual = spark.read() + .format("iceberg") + .load(location.toString()) + .collectAsList(); + + Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); + + for (int i = 0; i < expected.size(); i += 1) { + TestHelpers.assertEqualsSafe( + nestedSchema.asStruct(), expected.get(i), actual.get(i)); + } + } + } finally { + TestTables.clearTables(); + } + } }