diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index d916e7784c62..be05b0fe2db5 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -23,8 +23,11 @@ import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.StructType; public class StructProjection implements StructLike { @@ -82,8 +85,31 @@ private StructProjection(StructType structType, StructType projection) { dataField.type().asStructType(), projectedField.type().asStructType()); break; case MAP: + MapType projectedMap = projectedField.type().asMapType(); + MapType originalMap = dataField.type().asMapType(); + + boolean keyProjectable = !projectedMap.keyType().isNestedType() || + projectedMap.keyType().equals(originalMap.keyType()); + boolean valueProjectable = !projectedMap.valueType().isNestedType() || + projectedMap.valueType().equals(originalMap.valueType()); + Preconditions.checkArgument(keyProjectable && valueProjectable, + "Cannot project a partial map key or value struct. Trying to project %s out of %s", + projectedField, dataField); + + nestedProjections[pos] = null; + break; case LIST: - throw new IllegalArgumentException(String.format("Cannot project list or map field: %s", projectedField)); + ListType projectedList = projectedField.type().asListType(); + ListType originalList = dataField.type().asListType(); + + boolean elementProjectable = !projectedList.elementType().isNestedType() || + projectedList.elementType().equals(originalList.elementType()); + Preconditions.checkArgument(elementProjectable, + "Cannot project a partial list element struct. Trying to project %s out of %s", + projectedField, dataField); + + nestedProjections[pos] = null; + break; default: nestedProjections[pos] = null; } diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index c69429d80576..c1b714534def 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -28,7 +28,6 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; @@ -102,15 +101,13 @@ protected CloseableIterable planFiles( TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { CloseableIterable manifests = allManifestFiles(ops.current().snapshots()); - Type fileProjection = schema().findType("data_file"); - Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask( - ops.io(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById())); + ops.io(), manifest, schema(), schemaString, specString, residuals, ops.current().specsById())); } } diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index d6719343cbc0..68439295817e 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -31,6 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; /** * A {@link Table} implementation that exposes a table's valid manifest files as rows. @@ -124,7 +125,6 @@ protected CloseableIterable planFiles( String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); - // Data tasks produce the table schema, not the projection schema and projection is done by processing engines. return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> { if (snap.manifestListLocation() != null) { Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; @@ -134,14 +134,15 @@ protected CloseableIterable planFiles( .withRecordCount(1) .withFormat(FileFormat.AVRO) .build(); - return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask( + return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask( manifestListAsDataFile, null, schemaString, specString, residuals)); } else { return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), - snap.allManifests(), - manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest)); + MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(), + manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest) + ); } })); } @@ -149,11 +150,13 @@ protected CloseableIterable planFiles( static class ManifestListReadTask implements DataTask { private final FileIO io; + private final Schema schema; private final PartitionSpec spec; private final FileScanTask manifestListTask; - ManifestListReadTask(FileIO io, PartitionSpec spec, FileScanTask manifestListTask) { + ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) { this.io = io; + this.schema = schema; this.spec = spec; this.manifestListTask = manifestListTask; } @@ -175,9 +178,12 @@ public CloseableIterable rows() { .reuseContainers(false) .build()) { - return CloseableIterable.transform(manifests, + CloseableIterable rowIterable = CloseableIterable.transform(manifests, manifest -> ManifestsTable.manifestFileToRow(spec, manifest)); + StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema); + return CloseableIterable.transform(rowIterable, projection::wrap); + } catch (IOException e) { throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListTask.file().path()); } diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 13d28ad52b13..145663ce4d9b 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -109,12 +109,8 @@ protected CloseableIterable planFiles( Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); - // Data tasks produce the table schema, not the projection schema and projection is done by processing engines. - // This data task needs to use the table schema, which may not include a partition schema to avoid having an - // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in - // all cases. return CloseableIterable.transform(manifests, manifest -> - new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals)); + new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals)); } } diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java index 9b607ccbd6be..9d0b0a6d7daf 100644 --- a/core/src/main/java/org/apache/iceberg/HistoryTable.java +++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java @@ -68,8 +68,9 @@ private DataTask task(TableScan scan) { TableOperations ops = operations(); return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), - ops.current().snapshotLog(), - convertHistoryEntryFunc(table())); + schema(), scan.schema(), ops.current().snapshotLog(), + convertHistoryEntryFunc(table()) + ); } private class HistoryScan extends StaticTableScan { diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 6b434fa1ed14..7bae3491a787 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructProjection; /** * A {@link Table} implementation that exposes a table's manifest entries as rows, for both delete and data files. @@ -107,44 +108,53 @@ protected CloseableIterable planFiles( boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { // return entries from both data and delete manifests CloseableIterable manifests = CloseableIterable.withNoopClose(snapshot.allManifests()); - Type fileProjection = schema().findType("data_file"); - Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform(manifests, manifest -> - new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals, + new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals, ops.current().specsById())); } } static class ManifestReadTask extends BaseFileScanTask implements DataTask { + private final Schema schema; private final Schema fileSchema; private final FileIO io; private final ManifestFile manifest; private final Map specsById; - ManifestReadTask(FileIO io, ManifestFile manifest, Schema fileSchema, String schemaString, + ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString, String specString, ResidualEvaluator residuals, Map specsById) { super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); - this.fileSchema = fileSchema; + this.schema = schema; this.io = io; this.manifest = manifest; this.specsById = specsById; + + Type fileProjection = schema.findType("data_file"); + this.fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); } @Override public CloseableIterable rows() { + // Project data-file fields + CloseableIterable prunedRows; if (manifest.content() == ManifestContent.DATA) { - return CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(), + prunedRows = CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(), file -> (GenericManifestEntry) file); } else { - return CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById) + prunedRows = CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById) .project(fileSchema).entries(), file -> (GenericManifestEntry) file); } + + // Project non-readable fields + Schema readSchema = ManifestEntry.wrapFileSchema(fileSchema.asStruct()); + StructProjection projection = StructProjection.create(readSchema, schema); + return CloseableIterable.transform(prunedRows, projection::wrap); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index d67b8330d21d..a818840c8a7e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -75,8 +75,9 @@ protected DataTask task(TableScan scan) { String location = scan.snapshot().manifestListLocation(); return StaticDataTask.of( ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()), - scan.snapshot().allManifests(), - manifest -> ManifestsTable.manifestFileToRow(spec, manifest)); + schema(), scan.schema(), scan.snapshot().allManifests(), + manifest -> ManifestsTable.manifestFileToRow(spec, manifest) + ); } private class ManifestsTableScan extends StaticTableScan { diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index e190ca44560a..0215dfd45c00 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -76,11 +76,17 @@ private DataTask task(StaticTableScan scan) { Iterable partitions = partitions(scan); if (table().spec().fields().size() < 1) { // the table is unpartitioned, partitions contains only the root partition - return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions, - root -> StaticDataTask.Row.of(root.recordCount, root.fileCount)); + return StaticDataTask.of( + io().newInputFile(ops.current().metadataFileLocation()), + schema(), scan.schema(), partitions, + root -> StaticDataTask.Row.of(root.recordCount, root.fileCount) + ); } else { - return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions, - PartitionsTable::convertPartition); + return StaticDataTask.of( + io().newInputFile(ops.current().metadataFileLocation()), + schema(), scan.schema(), partitions, + PartitionsTable::convertPartition + ); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java index 3501662bc46a..4bb83afedcff 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java @@ -60,8 +60,9 @@ private DataTask task(BaseTableScan scan) { TableOperations ops = operations(); return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), - ops.current().snapshots(), - SnapshotsTable::snapshotToRow); + schema(), scan.schema(), ops.current().snapshots(), + SnapshotsTable::snapshotToRow + ); } @Override diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java index 24bff01e3da7..3aabd728d06f 100644 --- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java +++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java @@ -30,18 +30,26 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.StructProjection; class StaticDataTask implements DataTask { - static DataTask of(InputFile metadata, Iterable values, Function transform) { + static DataTask of(InputFile metadata, Schema tableSchema, Schema projectedSchema, Iterable values, + Function transform) { return new StaticDataTask(metadata, + tableSchema, + projectedSchema, Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0])); } private final DataFile metadataFile; private final StructLike[] rows; + private final Schema tableSchema; + private final Schema projectedSchema; - private StaticDataTask(InputFile metadata, StructLike[] rows) { + private StaticDataTask(InputFile metadata, Schema tableSchema, Schema projectedSchema, StructLike[] rows) { + this.tableSchema = tableSchema; + this.projectedSchema = projectedSchema; this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned()) .withInputFile(metadata) .withRecordCount(rows.length) @@ -57,7 +65,9 @@ public List deletes() { @Override public CloseableIterable rows() { - return CloseableIterable.withNoopClose(Arrays.asList(rows)); + StructProjection projection = StructProjection.create(tableSchema, projectedSchema); + Iterable projectedRows = Iterables.transform(Arrays.asList(rows), projection::wrap); + return CloseableIterable.withNoopClose(projectedRows); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 6d4bf8ec3933..391d4a053490 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; @@ -31,7 +30,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -40,28 +38,17 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.Attribute; -import org.apache.spark.sql.catalyst.expressions.AttributeReference; -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; -import org.apache.spark.sql.types.StructType; -import scala.collection.JavaConverters; class RowDataReader extends BaseDataReader { - // for some reason, the apply method can't be called from Java without reflection - private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") - .impl(UnsafeProjection.class, InternalRow.class) - .build(); private final Schema tableSchema; private final Schema expectedSchema; @@ -186,33 +173,10 @@ private CloseableIterable newOrcIterable( } private CloseableIterable newDataIterable(DataTask task, Schema readSchema) { - StructInternalRow row = new StructInternalRow(tableSchema.asStruct()); + StructInternalRow row = new StructInternalRow(readSchema.asStruct()); CloseableIterable asSparkRows = CloseableIterable.transform( task.asDataTask().rows(), row::setStruct); - return CloseableIterable.transform( - asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke); - } - - private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) { - StructType struct = SparkSchemaUtil.convert(readSchema); - - List refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava(); - List attrs = Lists.newArrayListWithExpectedSize(struct.fields().length); - List exprs = - Lists.newArrayListWithExpectedSize(struct.fields().length); - - for (AttributeReference ref : refs) { - attrs.add(ref.toAttribute()); - } - - for (Types.NestedField field : finalSchema.columns()) { - int indexInReadSchema = struct.fieldIndex(field.name()); - exprs.add(refs.get(indexInReadSchema)); - } - - return UnsafeProjection.create( - JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(), - JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); + return asSparkRows; } protected class SparkDeleteFilter extends DeleteFilter { diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 4244137d338c..e4ca09f1fec8 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.ManifestFile; @@ -38,13 +39,16 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; +import org.apache.spark.SparkException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -143,6 +147,89 @@ public void testEntriesTable() throws Exception { TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0)); } + @Test + public void testEntriesTableDataFilePrune() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + + List singleActual = rowsToJava(spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("data_file.file_path") + .collectAsList()); + + List singleExpected = ImmutableList.of(row(file.path())); + + assertEquals("Should prune a single element from a nested struct", singleExpected, singleActual); + } + + @Test + public void testEntriesTableDataFilePruneMulti() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + + List multiActual = rowsToJava(spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("data_file.file_path", "data_file.value_counts", "data_file.record_count", "data_file.column_sizes") + .collectAsList()); + + List multiExpected = ImmutableList.of( + row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes())); + + assertEquals("Should prune a single element from a nested struct", multiExpected, multiActual); + } + + @Test + public void testFilesSelectMap() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + + List multiActual = rowsToJava(spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "files")) + .select("file_path", "value_counts", "record_count", "column_sizes") + .collectAsList()); + + List multiExpected = ImmutableList.of( + row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes())); + + assertEquals("Should prune a single element from a row", multiExpected, multiActual); + } + @Test public void testAllEntriesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); @@ -677,6 +764,70 @@ public void testSnapshotsTable() { TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1)); } + @Test + public void testPrunedSnapshotsTable() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + + long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis(); + + // rollback the table state to the first snapshot + table.rollback().toSnapshotId(firstSnapshotId).commit(); + + Dataset actualDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "snapshots")) + .select("operation", "committed_at", "summary", "parent_id"); + + Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema()); + + List actual = actualDf.collectAsList(); + + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema, "snapshots")); + List expected = Lists.newArrayList( + builder.set("committed_at", firstSnapshotTimestamp * 1000) + .set("parent_id", null) + .set("operation", "append") + .set("summary", ImmutableMap.of( + "added-records", "1", + "added-data-files", "1", + "changed-partition-count", "1", + "total-data-files", "1", + "total-records", "1" + )) + .build(), + builder.set("committed_at", secondSnapshotTimestamp * 1000) + .set("parent_id", firstSnapshotId) + .set("operation", "delete") + .set("summary", ImmutableMap.of( + "deleted-records", "1", + "deleted-data-files", "1", + "changed-partition-count", "1", + "total-records", "0", + "total-data-files", "0" + )) + .build() + ); + + Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size()); + TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0)); + TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(1), actual.get(1)); + } + @Test public void testManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); @@ -724,6 +875,66 @@ public void testManifestsTable() { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0)); } + @Test + public void testPruneManifestsTable() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table manifestTable = loadTable(tableIdentifier, "manifests"); + Dataset df1 = spark.createDataFrame( + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")), SimpleRecord.class); + + df1.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + if (!spark.version().startsWith("2")) { + // Spark 2 isn't able to actually push down nested struct projections so this will not break + AssertHelpers.assertThrows("Can't prune struct inside list", SparkException.class, + "Cannot project a partial list element struct", + () -> spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "manifests")) + .select("partition_spec_id", "path", "partition_summaries.contains_null") + .collectAsList()); + } + + Dataset actualDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "manifests")) + .select("partition_spec_id", "path", "partition_summaries"); + + Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema()); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "manifests")) + .select("partition_spec_id", "path", "partition_summaries") + .collectAsList(); + + table.refresh(); + + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct())); + GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( + projectedSchema.findType("partition_summaries.element").asStructType(), "partition_summary")); + List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> + builder.set("partition_spec_id", manifest.partitionSpecId()) + .set("path", manifest.path()) + .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> + summaryBuilder + .set("contains_null", true) + .set("contains_nan", false) + .set("lower_bound", "1") + .set("upper_bound", "1") + .build() + )) + .build() + ); + + Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size()); + TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0)); + } + @Test public void testAllManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");