diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index e4791ee02cdc..36b519d79e72 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -38,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class TypeUtil { @@ -130,15 +131,47 @@ private static Set getIdsInternal(Type type, boolean includeStructIds) return visit(type, new GetProjectedIds(includeStructIds)); } + private static Set innerCheck(Set filteredIds, Types.NestedField innerField) { + // If it's a struct and if all ids need to be filtered we keep all. + // So we remove all ids from filtered ids set + if (innerField.type().isStructType()) { + if (innerField.type().asStructType().fields() + .stream().allMatch(f -> filteredIds.contains(f.fieldId()))) { + filteredIds.removeAll(innerField.type().asStructType().fields().stream() + .map(Types.NestedField::fieldId).collect(Collectors.toSet())); + } + innerField.type().asStructType().fields().forEach(f -> innerCheck(filteredIds, f)); + } + return filteredIds; + } + + private static void filter(Set projectedIds, Types.StructType struct, Set fieldIds, + boolean doesNeedToKeepInnerStruct) { + Set filteredIds = Sets.newHashSet(fieldIds); + if (doesNeedToKeepInnerStruct) { + struct.fields().forEach(f -> innerCheck(filteredIds, f)); + } + projectedIds.removeAll(filteredIds); + } + public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { + return selectNot(struct, fieldIds, false); + } + + public static Types.StructType selectNot(Types.StructType struct, Set fieldIds, + boolean doesNeedToKeepInnerStruct) { Set projectedIds = getIdsInternal(struct, false); - projectedIds.removeAll(fieldIds); + filter(projectedIds, struct, fieldIds, doesNeedToKeepInnerStruct); return project(struct, projectedIds); } public static Schema selectNot(Schema schema, Set fieldIds) { + return selectNot(schema, fieldIds, false); + } + + public static Schema selectNot(Schema schema, Set fieldIds, boolean doesNeedToKeepInnerStruct) { Set projectedIds = getIdsInternal(schema.asStruct(), false); - projectedIds.removeAll(fieldIds); + filter(projectedIds, schema.asStruct(), fieldIds, doesNeedToKeepInnerStruct); return project(schema, projectedIds); } diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index 210efd352f5b..ab1c513c329d 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -482,4 +482,59 @@ public void testSelectNot() { Schema actualNoStruct = TypeUtil.selectNot(schema, Sets.newHashSet(2)); Assert.assertEquals(schema.asStruct(), actualNoStruct.asStruct()); } + + @Test + public void testSelectNotInnerStruct() { + Schema schema = new Schema( + Lists.newArrayList( + required(1, "id", Types.LongType.get()), + required(2, "location", Types.StructType.of( + required(3, "lat", Types.DoubleType.get()), + required(4, "long", Types.DoubleType.get()), + required(5, "address", Types.StructType.of( + required(6, "number", Types.DoubleType.get()), + required(7, "street", Types.DoubleType.get()) + ) + ))))); + + // Expected that filter inner struct if all struct is not filtered + Schema expectedFilteredStruct = new Schema( + Lists.newArrayList( + required(1, "id", Types.LongType.get()), + required(2, "location", Types.StructType.of( + required(4, "long", Types.DoubleType.get()), + required(5, "address", Types.StructType.of( + required(6, "number", Types.DoubleType.get()), + required(7, "street", Types.DoubleType.get()) + ) + ))))); + Schema actualFilteredStruct = TypeUtil.selectNot(schema, Sets.newHashSet(3), + true /* doesNeedToKeepInnerStruct */); + Assert.assertEquals(expectedFilteredStruct.asStruct(), actualFilteredStruct.asStruct()); + + // Expected that do not filter inner struct if all struct is filtered + Schema actualNotInnerStruct = TypeUtil.selectNot(schema, Sets.newHashSet(3, 4, 5), + true /* doesNeedToKeepInnerStruct */); + Assert.assertEquals(schema.asStruct(), actualNotInnerStruct.asStruct()); + + Schema actualNotSubInnerStruct = TypeUtil.selectNot(schema, Sets.newHashSet(6, 7), + true /* doesNeedToKeepInnerStruct */); + Assert.assertEquals(schema.asStruct(), actualNotSubInnerStruct.asStruct()); + + Schema expectedFilteredInnerStruct = new Schema( + Lists.newArrayList( + required(1, "id", Types.LongType.get()), + required(2, "location", Types.StructType.of( + required(3, "lat", Types.DoubleType.get()), + required(4, "long", Types.DoubleType.get()), + required(5, "address", Types.StructType.of( + required(7, "street", Types.DoubleType.get()) + ) + ))))); + + Schema actualSubInnerStruct = TypeUtil.selectNot(schema, Sets.newHashSet(6), + true /* doesNeedToFilteredInnerStruct */); + Assert.assertEquals(expectedFilteredInnerStruct.asStruct(), actualSubInnerStruct.asStruct()); + } + } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index 913e7835f515..f741929040bc 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -125,7 +125,7 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject case ORC: Schema projectionWithoutConstantAndMetadataFields = TypeUtil.selectNot(fileProjection, - Sets.union(partition.keySet(), MetadataColumns.metadataFieldIds())); + Sets.union(partition.keySet(), MetadataColumns.metadataFieldIds()), true); ORC.ReadBuilder orc = ORC.read(input) .project(projectionWithoutConstantAndMetadataFields) .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(fileProjection, fileSchema, partition)) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index b71f2b0fafe5..a2a1bb328a0e 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -149,7 +149,7 @@ private CloseableIterable newParquetIterable( private CloseableIterable newOrcIterable( FileScanTask task, Schema schema, Map idToConstant, InputFilesDecryptor inputFilesDecryptor) { Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(schema, - Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()), true); ORC.ReadBuilder builder = ORC.read(inputFilesDecryptor.getInputFile(task)) .project(readSchemaWithoutConstantAndMetadataFields) diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index ccfbb6a31006..b0712c8780df 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -393,7 +393,7 @@ private CloseableIterable newParquetIterable(InputFile inputFile, FileScanTas private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { Map idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant); Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema, - Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()), true); CloseableIterable orcIterator = null; // ORC does not support reuse containers yet diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index 9d630508b6e4..9f091871d2a0 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.CatalogManager; import org.apache.spark.sql.connector.catalog.Identifier; @@ -399,6 +400,26 @@ public void testSparkTableAddDropPartitions() throws Exception { Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); } + @Test + public void testAddPartitionStruct() { + FORMATS.forEach(this::testAddPartitionInAStruct); + } + + private void testAddPartitionInAStruct(String format) { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("CREATE TABLE %s (id INT, st struct) USING iceberg " + + "TBLPROPERTIES ('write.format.default'='%s')", tableName, format); + sql("INSERT INTO TABLE %s VALUES (1, named_struct('id', 10, 'data', 'v1'))," + + "(2, named_struct('id', 20, 'data', 'v2'))", tableName); + + sql("ALTER TABLE %s ADD PARTITION FIELD st.data as data", tableName); + + sql("INSERT INTO TABLE %s VALUES (1, named_struct('id', 11, 'data', 'v1'))", tableName); + ImmutableList expectedRows = ImmutableList.of(row("v1"), row("v2")); + assertEquals("Should have expected rows", expectedRows, + sql("select distinct st.data from %s order by data", tableName)); + } + private void assertPartitioningEquals(SparkTable table, int len, String transform) { Assert.assertEquals("spark table partition should be " + len, len, table.partitioning().length); Assert.assertEquals("latest spark table partition transform should match", diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 5f643fa37d5d..c2be11671a8f 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -103,7 +103,8 @@ CloseableIterator open(FileScanTask task) { Set constantFieldIds = idToConstant.keySet(); Set metadataFieldIds = MetadataColumns.metadataFieldIds(); Sets.SetView constantAndMetadataFieldIds = Sets.union(constantFieldIds, metadataFieldIds); - Schema schemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(expectedSchema, constantAndMetadataFieldIds); + Schema schemaWithoutConstantAndMetadataFields = TypeUtil.selectNot( + expectedSchema, constantAndMetadataFieldIds, true); ORC.ReadBuilder builder = ORC.read(location) .project(schemaWithoutConstantAndMetadataFields) .split(task.start(), task.length()) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 4f5962494feb..b7be49c5751d 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -155,7 +155,7 @@ private CloseableIterable newOrcIterable( Schema readSchema, Map idToConstant) { Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema, - Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()), true); ORC.ReadBuilder builder = ORC.read(location) .project(readSchemaWithoutConstantAndMetadataFields) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 8022e8696b63..30ef756a31be 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -39,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -54,6 +55,7 @@ public abstract class SparkTestBase { protected static final Object ANY = new Object(); + protected static final List FORMATS = Lists.newArrayList("orc", "parquet", "avro"); protected static TestHiveMetastore metastore = null; protected static HiveConf hiveConf = null;