From bfacdcb7a4368ec879918c5eac20c8d2799a6861 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Mon, 26 Jul 2021 17:17:06 -0500 Subject: [PATCH 01/11] Spark: Support Nested Struct Pruning Previously we would only prune top level schema elements which wasn't a problem because Spark could not prune nested schema. In Spark 3.1 this is now possible so we need to adjust our projection code to correctly make bindings when structs are pruned. --- .../iceberg/spark/source/RowDataReader.java | 58 +++++++++++++-- .../spark/source/TestSparkMetadataTables.java | 74 +++++++++++++++++++ 2 files changed, 125 insertions(+), 7 deletions(-) create mode 100644 spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 6d4bf8ec3933..44ce76967faf 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -54,6 +54,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.collection.JavaConverters; @@ -194,20 +195,32 @@ private CloseableIterable newDataIterable(DataTask task, Schema rea } private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) { - StructType struct = SparkSchemaUtil.convert(readSchema); + StructType readStruct = SparkSchemaUtil.convert(readSchema); - List refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava(); - List attrs = Lists.newArrayListWithExpectedSize(struct.fields().length); + List readReferences = JavaConverters.seqAsJavaListConverter(readStruct.toAttributes()).asJava(); + List attrs = Lists.newArrayListWithExpectedSize(readStruct.fields().length); List exprs = - Lists.newArrayListWithExpectedSize(struct.fields().length); + Lists.newArrayListWithExpectedSize(readStruct.fields().length); - for (AttributeReference ref : refs) { + for (AttributeReference ref : readReferences) { attrs.add(ref.toAttribute()); } for (Types.NestedField field : finalSchema.columns()) { - int indexInReadSchema = struct.fieldIndex(field.name()); - exprs.add(refs.get(indexInReadSchema)); + int indexInReadSchema = readStruct.fieldIndex(field.name()); + if (field.type().isStructType()) { + StructField nestedType = readStruct.fields()[indexInReadSchema]; + AttributeReference ref = readReferences.get(indexInReadSchema); + exprs.add(ref.copy( + ref.name(), + projectInner(field.type().asStructType(), nestedType), + ref.nullable(), + ref.metadata(), + ref.exprId(), + ref.qualifier())); + } else { + exprs.add(readReferences.get(indexInReadSchema)); + } } return UnsafeProjection.create( @@ -215,6 +228,37 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); } + private static StructType projectInner(Types.StructType structField, StructField nestedStructField) { + Preconditions.checkState(nestedStructField.dataType() instanceof StructType); + StructType nestedType = (StructType) nestedStructField.dataType(); + List readReferences = JavaConverters.seqAsJavaListConverter(nestedType.toAttributes()).asJava(); + List attrs = Lists.newArrayListWithExpectedSize(nestedType.fields().length); + List fields = Lists.newArrayListWithExpectedSize(nestedType.fields().length); + + for (AttributeReference ref : readReferences) { + attrs.add(ref.toAttribute()); + } + + for (Types.NestedField field : structField.fields()) { + int indexInReadSchema = nestedType.fieldIndex(field.name()); + if (field.type().isStructType()) { + StructField innerInner = nestedType.fields()[indexInReadSchema]; + fields.add( + new StructField( + innerInner.name(), + projectInner(field.type().asStructType(), innerInner), + innerInner.nullable(), + innerInner.metadata()) + ); + } else { + fields.add(nestedType.fields()[indexInReadSchema]); + } + } + + return new StructType(fields.stream().toArray(StructField[]::new)); + } + + protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java new file mode 100644 index 000000000000..fbb865158fd3 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.Some; + +public class TestSparkMetadataTables extends SparkCatalogTestBase { + + public TestSparkMetadataTables(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Test + public void testDataFileProjectionError2() throws Exception { + // init load + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.writeTo(tableName).create(); + + + Dataset stringDs = spark.createDataset(Arrays.asList("my_path"), Encoders.STRING()) + .toDF("file_path"); + + SparkCatalog catalog = (SparkCatalog) spark.sessionState().catalogManager().catalog(catalogName); + String[] tableIdentifiers = tableName.split("\\."); + Identifier metaId = Identifier.of( + new String[]{tableIdentifiers[1], tableIdentifiers[2]}, "entries"); + SparkTable metaTable = catalog.loadTable(metaId); + Dataset entriesDs = Dataset.ofRows(spark, DataSourceV2Relation.create(metaTable, Some.apply(catalog), + Some.apply(metaId))); + + Column joinCond = entriesDs.col("data_file.file_path").equalTo(stringDs.col("file_path")); + Dataset res = entriesDs.join(stringDs, joinCond); + boolean empty = res.isEmpty(); + Assert.assertEquals(true, empty); + } + + @Before + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } +} From 883d4c00f5f7322e012f28e9d08dde7bb3d74a82 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 27 Jul 2021 11:30:09 -0500 Subject: [PATCH 02/11] Clean up fix --- .../iceberg/spark/source/RowDataReader.java | 35 +-------- .../source/TestIcebergSourceTablesBase.java | 24 ++++++ .../spark/source/TestSparkMetadataTables.java | 74 ------------------- 3 files changed, 26 insertions(+), 107 deletions(-) delete mode 100644 spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 44ce76967faf..c03e5414f1b3 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -209,11 +209,11 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema for (Types.NestedField field : finalSchema.columns()) { int indexInReadSchema = readStruct.fieldIndex(field.name()); if (field.type().isStructType()) { - StructField nestedType = readStruct.fields()[indexInReadSchema]; + // We may need to prune this attribute to only refer to our expected schema AttributeReference ref = readReferences.get(indexInReadSchema); exprs.add(ref.copy( ref.name(), - projectInner(field.type().asStructType(), nestedType), + SparkSchemaUtil.convert(field.type().asStructType()), ref.nullable(), ref.metadata(), ref.exprId(), @@ -228,37 +228,6 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); } - private static StructType projectInner(Types.StructType structField, StructField nestedStructField) { - Preconditions.checkState(nestedStructField.dataType() instanceof StructType); - StructType nestedType = (StructType) nestedStructField.dataType(); - List readReferences = JavaConverters.seqAsJavaListConverter(nestedType.toAttributes()).asJava(); - List attrs = Lists.newArrayListWithExpectedSize(nestedType.fields().length); - List fields = Lists.newArrayListWithExpectedSize(nestedType.fields().length); - - for (AttributeReference ref : readReferences) { - attrs.add(ref.toAttribute()); - } - - for (Types.NestedField field : structField.fields()) { - int indexInReadSchema = nestedType.fieldIndex(field.name()); - if (field.type().isStructType()) { - StructField innerInner = nestedType.fields()[indexInReadSchema]; - fields.add( - new StructField( - innerInner.name(), - projectInner(field.type().asStructType(), innerInner), - innerInner.nullable(), - innerInner.metadata()) - ); - } else { - fields.add(nestedType.fields()[indexInReadSchema]); - } - } - - return new StructType(fields.stream().toArray(StructField[]::new)); - } - - protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 4244137d338c..f4cc5464259e 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -144,6 +144,30 @@ public void testEntriesTable() throws Exception { } @Test + public void testEntriesTableDataFilePrune() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("data_file.file_path") + .collectAsList(); + + Assert.assertEquals("Should have a single entry", 1, actual.size()); + Assert.assertEquals("Should only have file_path", 0, actual.get(0).fieldIndex("file_path")); + } + public void testAllEntriesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java deleted file mode 100644 index fbb865158fd3..000000000000 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataTables.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.source; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.SparkCatalog; -import org.apache.iceberg.spark.SparkCatalogTestBase; -import org.apache.spark.sql.Column; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import scala.Some; - -public class TestSparkMetadataTables extends SparkCatalogTestBase { - - public TestSparkMetadataTables(String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @Test - public void testDataFileProjectionError2() throws Exception { - // init load - List records = Lists.newArrayList(new SimpleRecord(1, "1")); - Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); - inputDf.writeTo(tableName).create(); - - - Dataset stringDs = spark.createDataset(Arrays.asList("my_path"), Encoders.STRING()) - .toDF("file_path"); - - SparkCatalog catalog = (SparkCatalog) spark.sessionState().catalogManager().catalog(catalogName); - String[] tableIdentifiers = tableName.split("\\."); - Identifier metaId = Identifier.of( - new String[]{tableIdentifiers[1], tableIdentifiers[2]}, "entries"); - SparkTable metaTable = catalog.loadTable(metaId); - Dataset entriesDs = Dataset.ofRows(spark, DataSourceV2Relation.create(metaTable, Some.apply(catalog), - Some.apply(metaId))); - - Column joinCond = entriesDs.col("data_file.file_path").equalTo(stringDs.col("file_path")); - Dataset res = entriesDs.join(stringDs, joinCond); - boolean empty = res.isEmpty(); - Assert.assertEquals(true, empty); - } - - @Before - public void dropTables() { - sql("DROP TABLE IF EXISTS %s", tableName); - } -} From c69da8a8c1c2f99de3a1b826514775f0f07bde72 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 27 Jul 2021 11:55:38 -0500 Subject: [PATCH 03/11] Checkstyle --- .../main/java/org/apache/iceberg/spark/source/RowDataReader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index c03e5414f1b3..eb1408206f95 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -54,7 +54,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.collection.JavaConverters; From b75737090cc0e466c9c29700e58471668a248e7a Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 27 Jul 2021 22:01:32 -0500 Subject: [PATCH 04/11] Fix Nested Pruning Issue by Exposing Projected Scan Schema The underlying issue we were trying to solve is that ManifestEntryTable is allowed to prune columns from the underlying manifest entries that is reads but it does not expose that it has done so in the Table Schema. Only the ManifestEntries themselves know they have been pruned and because of this we have no way of recovering this information at scan time. To fix this we add the ability for DataTasks to expose a pruned schema which can be used by the various engines to generate proper projections. --- .../java/org/apache/iceberg/DataTask.java | 9 +++ .../apache/iceberg/ManifestEntriesTable.java | 5 ++ .../iceberg/spark/source/RowDataReader.java | 19 ++---- .../source/TestIcebergSourceTablesBase.java | 68 +++++++++++++++++-- 4 files changed, 82 insertions(+), 19 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/DataTask.java b/api/src/main/java/org/apache/iceberg/DataTask.java index f2a8d2a9d873..cd7cd0903517 100644 --- a/api/src/main/java/org/apache/iceberg/DataTask.java +++ b/api/src/main/java/org/apache/iceberg/DataTask.java @@ -35,6 +35,15 @@ default DataTask asDataTask() { return this; } + /** + * Optionally return a schema which this task will produce rows with, this may be null if + * the task should inherit the schema from the parent table. Scans which can prune columns can + * use this to correctly report their pruned schema if it differs from the default table schema. + */ + default Schema schema() { + return null; + } + /** * Returns an iterable of {@link StructLike} rows. */ diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 6b434fa1ed14..d7ecdae48b58 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -135,6 +135,11 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { this.specsById = specsById; } + @Override + public Schema schema() { + return ManifestEntry.wrapFileSchema(fileSchema.asStruct()); + } + @Override public CloseableIterable rows() { if (manifest.content() == ManifestContent.DATA) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index eb1408206f95..eec0101e7091 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -186,11 +186,12 @@ private CloseableIterable newOrcIterable( } private CloseableIterable newDataIterable(DataTask task, Schema readSchema) { - StructInternalRow row = new StructInternalRow(tableSchema.asStruct()); + Schema taskSchema = task.schema() == null ? tableSchema : task.schema(); + StructInternalRow row = new StructInternalRow(taskSchema.asStruct()); CloseableIterable asSparkRows = CloseableIterable.transform( task.asDataTask().rows(), row::setStruct); return CloseableIterable.transform( - asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke); + asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, taskSchema))::invoke); } private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) { @@ -207,19 +208,7 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema for (Types.NestedField field : finalSchema.columns()) { int indexInReadSchema = readStruct.fieldIndex(field.name()); - if (field.type().isStructType()) { - // We may need to prune this attribute to only refer to our expected schema - AttributeReference ref = readReferences.get(indexInReadSchema); - exprs.add(ref.copy( - ref.name(), - SparkSchemaUtil.convert(field.type().asStructType()), - ref.nullable(), - ref.metadata(), - ref.exprId(), - ref.qualifier())); - } else { - exprs.add(readReferences.get(indexInReadSchema)); - } + exprs.add(readReferences.get(indexInReadSchema)); } return UnsafeProjection.create( diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index f4cc5464259e..4315f1c0ac81 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -38,6 +38,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -157,17 +158,76 @@ public void testEntriesTableDataFilePrune() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); + DataFile file = table.currentSnapshot().addedFiles().iterator().next(); - List actual = spark.read() + List singleActual = rowsToJava(spark.read() .format("iceberg") .load(loadLocation(tableIdentifier, "entries")) .select("data_file.file_path") - .collectAsList(); + .collectAsList()); + + List singleExpected = ImmutableList.of(row(file.path())); + + assertEquals("Should prune a single element from a nested struct", singleExpected, singleActual); + } + + @Test + public void testEntriesTableDataFilePruneMulti() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + + List multiActual = rowsToJava(spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("data_file.file_path", "data_file.value_counts", "data_file.record_count", "data_file.column_sizes") + .collectAsList()); - Assert.assertEquals("Should have a single entry", 1, actual.size()); - Assert.assertEquals("Should only have file_path", 0, actual.get(0).fieldIndex("file_path")); + List multiExpected = ImmutableList.of( + row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes())); + + assertEquals("Should prune a single element from a nested struct", multiExpected, multiActual); + } + + @Test + public void testFilesSelectMap() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + + List multiActual = rowsToJava(spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "files")) + .select("file_path", "value_counts", "record_count", "column_sizes") + .collectAsList()); + + List multiExpected = ImmutableList.of( + row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes())); + + assertEquals("Should prune a single element from a row", multiExpected, multiActual); } + @Test public void testAllEntriesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); From 09871b07463cd41ed846a425436626a465cbbbd6 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Mon, 2 Aug 2021 16:36:25 -0500 Subject: [PATCH 05/11] Move DataTask pruning into Tasks Previously DataTasks would return full schemas for some tables and pruned schemas for others and would rely on the Framework to do the actual projection. This moves projection and pruning into the core responsibility of the task. --- .../java/org/apache/iceberg/DataTask.java | 9 ------ .../apache/iceberg/util/StructProjection.java | 2 +- .../org/apache/iceberg/AllEntriesTable.java | 4 +-- .../org/apache/iceberg/AllManifestsTable.java | 17 +++++++---- .../org/apache/iceberg/DataFilesTable.java | 6 +--- .../java/org/apache/iceberg/HistoryTable.java | 4 ++- .../apache/iceberg/ManifestEntriesTable.java | 28 +++++++++++-------- .../org/apache/iceberg/ManifestsTable.java | 4 ++- .../org/apache/iceberg/PartitionsTable.java | 16 ++++++++--- .../org/apache/iceberg/SnapshotsTable.java | 4 ++- .../org/apache/iceberg/StaticDataTask.java | 18 +++++++++--- .../iceberg/spark/source/RowDataReader.java | 6 ++-- 12 files changed, 69 insertions(+), 49 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/DataTask.java b/api/src/main/java/org/apache/iceberg/DataTask.java index cd7cd0903517..f2a8d2a9d873 100644 --- a/api/src/main/java/org/apache/iceberg/DataTask.java +++ b/api/src/main/java/org/apache/iceberg/DataTask.java @@ -35,15 +35,6 @@ default DataTask asDataTask() { return this; } - /** - * Optionally return a schema which this task will produce rows with, this may be null if - * the task should inherit the schema from the parent table. Scans which can prune columns can - * use this to correctly report their pruned schema if it differs from the default table schema. - */ - default Schema schema() { - return null; - } - /** * Returns an iterable of {@link StructLike} rows. */ diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index d916e7784c62..2c20e431a48b 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -83,7 +83,7 @@ private StructProjection(StructType structType, StructType projection) { break; case MAP: case LIST: - throw new IllegalArgumentException(String.format("Cannot project list or map field: %s", projectedField)); + // TODO Figure this out default: nestedProjections[pos] = null; } diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index c69429d80576..c82c7bdf7d08 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -102,15 +102,13 @@ protected CloseableIterable planFiles( TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { CloseableIterable manifests = allManifestFiles(ops.current().snapshots()); - Type fileProjection = schema().findType("data_file"); - Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask( - ops.io(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById())); + ops.io(), manifest, schema(), schemaString, specString, residuals, ops.current().specsById())); } } diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index d6719343cbc0..f17132b16d13 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -31,6 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; /** * A {@link Table} implementation that exposes a table's valid manifest files as rows. @@ -124,7 +125,6 @@ protected CloseableIterable planFiles( String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); - // Data tasks produce the table schema, not the projection schema and projection is done by processing engines. return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> { if (snap.manifestListLocation() != null) { Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; @@ -134,14 +134,16 @@ protected CloseableIterable planFiles( .withRecordCount(1) .withFormat(FileFormat.AVRO) .build(); - return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask( + return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask( manifestListAsDataFile, null, schemaString, specString, residuals)); } else { return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), snap.allManifests(), - manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest)); + manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest), + MANIFEST_FILE_SCHEMA, + schema()); } })); } @@ -149,11 +151,13 @@ protected CloseableIterable planFiles( static class ManifestListReadTask implements DataTask { private final FileIO io; + private final Schema schema; private final PartitionSpec spec; private final FileScanTask manifestListTask; - ManifestListReadTask(FileIO io, PartitionSpec spec, FileScanTask manifestListTask) { + ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) { this.io = io; + this.schema = schema; this.spec = spec; this.manifestListTask = manifestListTask; } @@ -175,9 +179,12 @@ public CloseableIterable rows() { .reuseContainers(false) .build()) { - return CloseableIterable.transform(manifests, + CloseableIterable rowIterable = CloseableIterable.transform(manifests, manifest -> ManifestsTable.manifestFileToRow(spec, manifest)); + StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema); + return CloseableIterable.transform(rowIterable, projection::wrap); + } catch (IOException e) { throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListTask.file().path()); } diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 13d28ad52b13..145663ce4d9b 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -109,12 +109,8 @@ protected CloseableIterable planFiles( Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); - // Data tasks produce the table schema, not the projection schema and projection is done by processing engines. - // This data task needs to use the table schema, which may not include a partition schema to avoid having an - // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in - // all cases. return CloseableIterable.transform(manifests, manifest -> - new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals)); + new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals)); } } diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java index 9b607ccbd6be..5f98b6d0902f 100644 --- a/core/src/main/java/org/apache/iceberg/HistoryTable.java +++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java @@ -69,7 +69,9 @@ private DataTask task(TableScan scan) { return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), ops.current().snapshotLog(), - convertHistoryEntryFunc(table())); + convertHistoryEntryFunc(table()), + schema(), + scan.schema()); } private class HistoryScan extends StaticTableScan { diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index d7ecdae48b58..a09a0855c16a 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -29,6 +29,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.StructProjection; /** * A {@link Table} implementation that exposes a table's manifest entries as rows, for both delete and data files. @@ -107,49 +109,53 @@ protected CloseableIterable planFiles( boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { // return entries from both data and delete manifests CloseableIterable manifests = CloseableIterable.withNoopClose(snapshot.allManifests()); - Type fileProjection = schema().findType("data_file"); - Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform(manifests, manifest -> - new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals, + new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals, ops.current().specsById())); } } static class ManifestReadTask extends BaseFileScanTask implements DataTask { + private final Schema schema; private final Schema fileSchema; private final FileIO io; private final ManifestFile manifest; private final Map specsById; - ManifestReadTask(FileIO io, ManifestFile manifest, Schema fileSchema, String schemaString, + ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString, String specString, ResidualEvaluator residuals, Map specsById) { super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); - this.fileSchema = fileSchema; + this.schema = schema; this.io = io; this.manifest = manifest; this.specsById = specsById; - } - @Override - public Schema schema() { - return ManifestEntry.wrapFileSchema(fileSchema.asStruct()); + Type fileProjection = schema.findType("data_file"); + this.fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); } @Override public CloseableIterable rows() { + // Project data-file fields + CloseableIterable prunedRows; if (manifest.content() == ManifestContent.DATA) { - return CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(), + prunedRows = CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(), file -> (GenericManifestEntry) file); } else { - return CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById) + prunedRows = CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById) .project(fileSchema).entries(), file -> (GenericManifestEntry) file); } + + // Project non-readable fields + Schema readSchema = ManifestEntry.wrapFileSchema(fileSchema.asStruct()); + StructProjection projection = StructProjection.create(readSchema, schema); + return CloseableIterable.transform(prunedRows, projection::wrap); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index d67b8330d21d..855bea0eedd4 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -76,7 +76,9 @@ protected DataTask task(TableScan scan) { return StaticDataTask.of( ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()), scan.snapshot().allManifests(), - manifest -> ManifestsTable.manifestFileToRow(spec, manifest)); + manifest -> ManifestsTable.manifestFileToRow(spec, manifest), + schema(), + scan.schema()); } private class ManifestsTableScan extends StaticTableScan { diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index e190ca44560a..e663c2c8548b 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -76,11 +76,19 @@ private DataTask task(StaticTableScan scan) { Iterable partitions = partitions(scan); if (table().spec().fields().size() < 1) { // the table is unpartitioned, partitions contains only the root partition - return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions, - root -> StaticDataTask.Row.of(root.recordCount, root.fileCount)); + return StaticDataTask.of( + io().newInputFile(ops.current().metadataFileLocation()), + partitions, + root -> StaticDataTask.Row.of(root.recordCount, root.fileCount), + schema(), + scan.schema()); } else { - return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions, - PartitionsTable::convertPartition); + return StaticDataTask.of( + io().newInputFile(ops.current().metadataFileLocation()), + partitions, + PartitionsTable::convertPartition, + schema(), + scan.schema()); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java index 3501662bc46a..d47d651db774 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java @@ -61,7 +61,9 @@ private DataTask task(BaseTableScan scan) { return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), ops.current().snapshots(), - SnapshotsTable::snapshotToRow); + SnapshotsTable::snapshotToRow, + schema(), + scan.schema()); } @Override diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java index 24bff01e3da7..716fb41ffc30 100644 --- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java +++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java @@ -30,18 +30,26 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.StructProjection; class StaticDataTask implements DataTask { - static DataTask of(InputFile metadata, Iterable values, Function transform) { + static DataTask of(InputFile metadata, Iterable values, Function transform, + Schema original, Schema projected) { return new StaticDataTask(metadata, - Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0])); + Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]), + original, + projected); } private final DataFile metadataFile; private final StructLike[] rows; + private final Schema original; + private final Schema projectedSchema; - private StaticDataTask(InputFile metadata, StructLike[] rows) { + private StaticDataTask(InputFile metadata, StructLike[] rows, Schema original, Schema projectedSchema) { + this.original = original; + this.projectedSchema = projectedSchema; this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned()) .withInputFile(metadata) .withRecordCount(rows.length) @@ -57,7 +65,9 @@ public List deletes() { @Override public CloseableIterable rows() { - return CloseableIterable.withNoopClose(Arrays.asList(rows)); + StructProjection projection = StructProjection.create(original, projectedSchema); + Iterable projectedRows = Iterables.transform(Arrays.asList(rows), projection::wrap); + return CloseableIterable.withNoopClose(projectedRows); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index eec0101e7091..f118b1b2ecec 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -186,12 +186,10 @@ private CloseableIterable newOrcIterable( } private CloseableIterable newDataIterable(DataTask task, Schema readSchema) { - Schema taskSchema = task.schema() == null ? tableSchema : task.schema(); - StructInternalRow row = new StructInternalRow(taskSchema.asStruct()); + StructInternalRow row = new StructInternalRow(readSchema.asStruct()); CloseableIterable asSparkRows = CloseableIterable.transform( task.asDataTask().rows(), row::setStruct); - return CloseableIterable.transform( - asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, taskSchema))::invoke); + return asSparkRows; } private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) { From b979cd8db78cf88dc7e464acff9112342c487bec Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 4 Aug 2021 14:10:56 -0500 Subject: [PATCH 06/11] Refactor based on review --- .../apache/iceberg/util/StructProjection.java | 28 +++- .../iceberg/spark/source/RowDataReader.java | 35 ----- .../source/TestIcebergSourceTablesBase.java | 125 ++++++++++++++++++ 3 files changed, 152 insertions(+), 36 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index 2c20e431a48b..5050f84c6874 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -23,8 +23,11 @@ import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.StructType; public class StructProjection implements StructLike { @@ -82,8 +85,31 @@ private StructProjection(StructType structType, StructType projection) { dataField.type().asStructType(), projectedField.type().asStructType()); break; case MAP: + MapType projectedMap = projectedField.type().asMapType(); + MapType originalMap = dataField.type().asMapType(); + + boolean keyProjectable = !projectedMap.keyType().isStructType() || + projectedMap.keyType().equals(originalMap.keyType()); + boolean valueProjectable = !projectedMap.valueType().isStructType() || + projectedMap.valueType().equals(originalMap.valueType()); + + Preconditions.checkArgument(keyProjectable && valueProjectable, + "Cannot perform a projection of a map unless key and value types are primitive or a " + + "struct which is fully projected. Trying to project %s out of %s", projectedField, dataField); + nestedProjections[pos] = null; + break; case LIST: - // TODO Figure this out + ListType projectedList = projectedField.type().asListType(); + ListType originalList = dataField.type().asListType(); + + boolean elementProjectable = !projectedList.elementType().isStructType() || + projectedList.elementType().equals(originalList.elementType()); + + Preconditions.checkArgument(elementProjectable, + "Cannot perform a projection of a list unless it's element is a primitive or a struct which is " + + "fully projected. Trying to project %s out of %s", projectedField, dataField); + nestedProjections[pos] = null; + break; default: nestedProjections[pos] = null; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index f118b1b2ecec..391d4a053490 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; @@ -31,7 +30,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -40,28 +38,17 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.Attribute; -import org.apache.spark.sql.catalyst.expressions.AttributeReference; -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; -import org.apache.spark.sql.types.StructType; -import scala.collection.JavaConverters; class RowDataReader extends BaseDataReader { - // for some reason, the apply method can't be called from Java without reflection - private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") - .impl(UnsafeProjection.class, InternalRow.class) - .build(); private final Schema tableSchema; private final Schema expectedSchema; @@ -192,28 +179,6 @@ private CloseableIterable newDataIterable(DataTask task, Schema rea return asSparkRows; } - private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) { - StructType readStruct = SparkSchemaUtil.convert(readSchema); - - List readReferences = JavaConverters.seqAsJavaListConverter(readStruct.toAttributes()).asJava(); - List attrs = Lists.newArrayListWithExpectedSize(readStruct.fields().length); - List exprs = - Lists.newArrayListWithExpectedSize(readStruct.fields().length); - - for (AttributeReference ref : readReferences) { - attrs.add(ref.toAttribute()); - } - - for (Types.NestedField field : finalSchema.columns()) { - int indexInReadSchema = readStruct.fieldIndex(field.name()); - exprs.add(readReferences.get(indexInReadSchema)); - } - - return UnsafeProjection.create( - JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(), - JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); - } - protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 4315f1c0ac81..31ad0a440792 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.ManifestFile; @@ -42,10 +43,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; +import org.apache.spark.SparkException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -761,6 +764,70 @@ public void testSnapshotsTable() { TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1)); } + @Test + public void testPrunedSnapshotsTable() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + + long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis(); + + // rollback the table state to the first snapshot + table.rollback().toSnapshotId(firstSnapshotId).commit(); + + Dataset actualDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "snapshots")) + .select("operation", "committed_at", "summary", "parent_id"); + + Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema()); + + List actual = actualDf.collectAsList(); + + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema, "snapshots")); + List expected = Lists.newArrayList( + builder.set("committed_at", firstSnapshotTimestamp * 1000) + .set("parent_id", null) + .set("operation", "append") + .set("summary", ImmutableMap.of( + "added-records", "1", + "added-data-files", "1", + "changed-partition-count", "1", + "total-data-files", "1", + "total-records", "1" + )) + .build(), + builder.set("committed_at", secondSnapshotTimestamp * 1000) + .set("parent_id", firstSnapshotId) + .set("operation", "delete") + .set("summary", ImmutableMap.of( + "deleted-records", "1", + "deleted-data-files", "1", + "changed-partition-count", "1", + "total-records", "0", + "total-data-files", "0" + )) + .build() + ); + + Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size()); + TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0)); + TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(1), actual.get(1)); + } + @Test public void testManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); @@ -808,6 +875,64 @@ public void testManifestsTable() { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0)); } + @Test + public void testPruneManifestsTable() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table manifestTable = loadTable(tableIdentifier, "manifests"); + Dataset df1 = spark.createDataFrame( + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")), SimpleRecord.class); + + df1.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + AssertHelpers.assertThrows("Can't prune struct inside list", SparkException.class, + "Cannot perform a projection of a list", + () -> spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "manifests")) + .select("partition_spec_id", "path", "partition_summaries.contains_null") + .collectAsList()); + + Dataset actualDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "manifests")) + .select("partition_spec_id", "path", "partition_summaries"); + + Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema()); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "manifests")) + .select("partition_spec_id", "path", "partition_summaries") + .collectAsList(); + + table.refresh(); + + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct())); + GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( + projectedSchema.findType("partition_summaries.element").asStructType(), "partition_summary")); + List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> + builder.set("partition_spec_id", manifest.partitionSpecId()) + .set("path", manifest.path()) + .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> + summaryBuilder + .set("contains_null", true) + .set("contains_nan", false) + .set("lower_bound", "1") + .set("upper_bound", "1") + .build() + )) + .build() + ); + + Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size()); + TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0)); + } + + @Test public void testAllManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); From 13622b88726f668a8aa5be37cddd53d42bf728bf Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 4 Aug 2021 14:18:08 -0500 Subject: [PATCH 07/11] Checkstyle --- core/src/main/java/org/apache/iceberg/AllEntriesTable.java | 1 - core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java | 1 - 2 files changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index c82c7bdf7d08..c1b714534def 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -28,7 +28,6 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index a09a0855c16a..7bae3491a787 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -29,7 +29,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.StructProjection; /** From 8593a92a9543b752ef5dfb7758ed0f3e3d86d5e0 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 4 Aug 2021 14:51:13 -0500 Subject: [PATCH 08/11] Change StaticDataTask Signature --- .../org/apache/iceberg/AllManifestsTable.java | 7 +++---- .../java/org/apache/iceberg/HistoryTable.java | 7 +++---- .../org/apache/iceberg/ManifestsTable.java | 7 +++---- .../org/apache/iceberg/PartitionsTable.java | 14 ++++++-------- .../org/apache/iceberg/SnapshotsTable.java | 7 +++---- .../org/apache/iceberg/StaticDataTask.java | 18 +++++++++--------- 6 files changed, 27 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index f17132b16d13..68439295817e 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -140,10 +140,9 @@ protected CloseableIterable planFiles( } else { return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), - snap.allManifests(), - manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest), - MANIFEST_FILE_SCHEMA, - schema()); + MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(), + manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest) + ); } })); } diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java index 5f98b6d0902f..9d0b0a6d7daf 100644 --- a/core/src/main/java/org/apache/iceberg/HistoryTable.java +++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java @@ -68,10 +68,9 @@ private DataTask task(TableScan scan) { TableOperations ops = operations(); return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), - ops.current().snapshotLog(), - convertHistoryEntryFunc(table()), - schema(), - scan.schema()); + schema(), scan.schema(), ops.current().snapshotLog(), + convertHistoryEntryFunc(table()) + ); } private class HistoryScan extends StaticTableScan { diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index 855bea0eedd4..a818840c8a7e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -75,10 +75,9 @@ protected DataTask task(TableScan scan) { String location = scan.snapshot().manifestListLocation(); return StaticDataTask.of( ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()), - scan.snapshot().allManifests(), - manifest -> ManifestsTable.manifestFileToRow(spec, manifest), - schema(), - scan.schema()); + schema(), scan.schema(), scan.snapshot().allManifests(), + manifest -> ManifestsTable.manifestFileToRow(spec, manifest) + ); } private class ManifestsTableScan extends StaticTableScan { diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index e663c2c8548b..0215dfd45c00 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -78,17 +78,15 @@ private DataTask task(StaticTableScan scan) { // the table is unpartitioned, partitions contains only the root partition return StaticDataTask.of( io().newInputFile(ops.current().metadataFileLocation()), - partitions, - root -> StaticDataTask.Row.of(root.recordCount, root.fileCount), - schema(), - scan.schema()); + schema(), scan.schema(), partitions, + root -> StaticDataTask.Row.of(root.recordCount, root.fileCount) + ); } else { return StaticDataTask.of( io().newInputFile(ops.current().metadataFileLocation()), - partitions, - PartitionsTable::convertPartition, - schema(), - scan.schema()); + schema(), scan.schema(), partitions, + PartitionsTable::convertPartition + ); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java index d47d651db774..4bb83afedcff 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java @@ -60,10 +60,9 @@ private DataTask task(BaseTableScan scan) { TableOperations ops = operations(); return StaticDataTask.of( ops.io().newInputFile(ops.current().metadataFileLocation()), - ops.current().snapshots(), - SnapshotsTable::snapshotToRow, - schema(), - scan.schema()); + schema(), scan.schema(), ops.current().snapshots(), + SnapshotsTable::snapshotToRow + ); } @Override diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java index 716fb41ffc30..3aabd728d06f 100644 --- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java +++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java @@ -34,21 +34,21 @@ class StaticDataTask implements DataTask { - static DataTask of(InputFile metadata, Iterable values, Function transform, - Schema original, Schema projected) { + static DataTask of(InputFile metadata, Schema tableSchema, Schema projectedSchema, Iterable values, + Function transform) { return new StaticDataTask(metadata, - Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]), - original, - projected); + tableSchema, + projectedSchema, + Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0])); } private final DataFile metadataFile; private final StructLike[] rows; - private final Schema original; + private final Schema tableSchema; private final Schema projectedSchema; - private StaticDataTask(InputFile metadata, StructLike[] rows, Schema original, Schema projectedSchema) { - this.original = original; + private StaticDataTask(InputFile metadata, Schema tableSchema, Schema projectedSchema, StructLike[] rows) { + this.tableSchema = tableSchema; this.projectedSchema = projectedSchema; this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned()) .withInputFile(metadata) @@ -65,7 +65,7 @@ public List deletes() { @Override public CloseableIterable rows() { - StructProjection projection = StructProjection.create(original, projectedSchema); + StructProjection projection = StructProjection.create(tableSchema, projectedSchema); Iterable projectedRows = Iterables.transform(Arrays.asList(rows), projection::wrap); return CloseableIterable.withNoopClose(projectedRows); } From 7374d9cafef00647de71cdedc93a025811ba424f Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 4 Aug 2021 15:47:58 -0500 Subject: [PATCH 09/11] Reviewer Comments --- .../apache/iceberg/util/StructProjection.java | 18 +++++++++--------- .../source/TestIcebergSourceTablesBase.java | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index 5050f84c6874..be05b0fe2db5 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -88,26 +88,26 @@ private StructProjection(StructType structType, StructType projection) { MapType projectedMap = projectedField.type().asMapType(); MapType originalMap = dataField.type().asMapType(); - boolean keyProjectable = !projectedMap.keyType().isStructType() || + boolean keyProjectable = !projectedMap.keyType().isNestedType() || projectedMap.keyType().equals(originalMap.keyType()); - boolean valueProjectable = !projectedMap.valueType().isStructType() || + boolean valueProjectable = !projectedMap.valueType().isNestedType() || projectedMap.valueType().equals(originalMap.valueType()); - Preconditions.checkArgument(keyProjectable && valueProjectable, - "Cannot perform a projection of a map unless key and value types are primitive or a " + - "struct which is fully projected. Trying to project %s out of %s", projectedField, dataField); + "Cannot project a partial map key or value struct. Trying to project %s out of %s", + projectedField, dataField); + nestedProjections[pos] = null; break; case LIST: ListType projectedList = projectedField.type().asListType(); ListType originalList = dataField.type().asListType(); - boolean elementProjectable = !projectedList.elementType().isStructType() || + boolean elementProjectable = !projectedList.elementType().isNestedType() || projectedList.elementType().equals(originalList.elementType()); - Preconditions.checkArgument(elementProjectable, - "Cannot perform a projection of a list unless it's element is a primitive or a struct which is " + - "fully projected. Trying to project %s out of %s", projectedField, dataField); + "Cannot project a partial list element struct. Trying to project %s out of %s", + projectedField, dataField); + nestedProjections[pos] = null; break; default: diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 31ad0a440792..fd2329b2fb46 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -889,7 +889,7 @@ public void testPruneManifestsTable() { .save(loadLocation(tableIdentifier)); AssertHelpers.assertThrows("Can't prune struct inside list", SparkException.class, - "Cannot perform a projection of a list", + "Cannot project a partial list element struct", () -> spark.read() .format("iceberg") .load(loadLocation(tableIdentifier, "manifests")) From 412b57e826467ea510205073fb8aa4a791a26780 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 4 Aug 2021 15:53:46 -0500 Subject: [PATCH 10/11] Extra newline --- .../apache/iceberg/spark/source/TestIcebergSourceTablesBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index fd2329b2fb46..6d3232ec68e5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -932,7 +932,6 @@ public void testPruneManifestsTable() { TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0)); } - @Test public void testAllManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); From 1fd8db8c08fee8d34bf74b47d1908800b5a25f6c Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 4 Aug 2021 18:43:51 -0500 Subject: [PATCH 11/11] Fix Spark2 Test Failure --- .../source/TestIcebergSourceTablesBase.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 6d3232ec68e5..e4ca09f1fec8 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -888,13 +888,16 @@ public void testPruneManifestsTable() { .mode("append") .save(loadLocation(tableIdentifier)); - AssertHelpers.assertThrows("Can't prune struct inside list", SparkException.class, - "Cannot project a partial list element struct", - () -> spark.read() - .format("iceberg") - .load(loadLocation(tableIdentifier, "manifests")) - .select("partition_spec_id", "path", "partition_summaries.contains_null") - .collectAsList()); + if (!spark.version().startsWith("2")) { + // Spark 2 isn't able to actually push down nested struct projections so this will not break + AssertHelpers.assertThrows("Can't prune struct inside list", SparkException.class, + "Cannot project a partial list element struct", + () -> spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "manifests")) + .select("partition_spec_id", "path", "partition_summaries.contains_null") + .collectAsList()); + } Dataset actualDf = spark.read() .format("iceberg")