From e2b003af48aeac492d342ce1ce6b09e2323f747c Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Sat, 7 Aug 2021 10:39:25 -0500 Subject: [PATCH 1/7] API: Change GetProjectedIds to Return all Ids Previously getProjectedIds would only return leaf nodes and primitives that were selected. This made it impossible to return empty structs. To fix this we change the behavior to return all id's of required fields including structs. This in turn requires fixing the alternate PruneColumn methods for Avro and Parquet to respect that they will now have selected field ID's for non primtiive nodes. Previous use cases of TypeUtil.select are converted to TypeUtil.project, which inverses this new getProjecetedIds code. --- .../apache/iceberg/types/GetProjectedIds.java | 4 +- .../apache/iceberg/util/StructProjection.java | 2 +- .../apache/iceberg/types/TestTypeUtil.java | 50 ++--- .../org/apache/iceberg/BaseTableScan.java | 2 +- .../org/apache/iceberg/avro/PruneColumns.java | 47 ++++- .../org/apache/iceberg/TestSchemaUpdate.java | 2 +- .../iceberg/avro/TestReadProjection.java | 186 +++++++++++++++++- .../apache/iceberg/parquet/PruneColumns.java | 7 +- .../source/TestIcebergSourceTablesBase.java | 45 ++++- 9 files changed, 288 insertions(+), 57 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java index d50c45e22e6b..e5a483aa53ec 100644 --- a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java +++ b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java @@ -39,9 +39,7 @@ public Set struct(Types.StructType struct, List> fieldResu @Override public Set field(Types.NestedField field, Set fieldResult) { - if (fieldResult == null) { - fieldIds.add(field.fieldId()); - } + fieldIds.add(field.fieldId()); return fieldIds; } diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index be05b0fe2db5..1bc10ac3755c 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -42,7 +42,7 @@ public class StructProjection implements StructLike { */ public static StructProjection create(Schema schema, Set ids) { StructType structType = schema.asStruct(); - return new StructProjection(structType, TypeUtil.select(structType, ids)); + return new StructProjection(structType, TypeUtil.project(structType, ids)); } /** diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index f9d9ef13e565..1743c80973a0 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -21,6 +21,7 @@ package org.apache.iceberg.types; import org.apache.iceberg.AssertHelpers; +import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -323,48 +324,25 @@ public void testProjectMap() { } @Test - public void testProjectList() { + public void testGetProjectedIds() { Schema schema = new Schema( Lists.newArrayList( required(10, "a", Types.IntegerType.get()), required(11, "A", Types.IntegerType.get()), - required(12, "list", Types.ListType.ofRequired(13, - Types.StructType.of( - optional(20, "foo", Types.IntegerType.get()), - required(21, "subList", Types.ListType.ofRequired(14, - Types.StructType.of( - required(15, "x", Types.IntegerType.get()), - required(16, "y", Types.IntegerType.get()), - required(17, "z", Types.IntegerType.get()))))))))); - - - AssertHelpers.assertThrows("Cannot explicitly project List", - IllegalArgumentException.class, - () -> TypeUtil.project(schema, Sets.newHashSet(12)) - ); - - AssertHelpers.assertThrows("Cannot explicitly project List", - IllegalArgumentException.class, - () -> TypeUtil.project(schema, Sets.newHashSet(21)) - ); + required(35, "emptyStruct", Types.StructType.of()), + required(12, "someStruct", Types.StructType.of( + required(13, "b", Types.IntegerType.get()), + required(14, "B", Types.IntegerType.get()), + required(15, "anotherStruct", Types.StructType.of( + required(16, "c", Types.IntegerType.get()), + required(17, "C", Types.IntegerType.get())) + ))))); - Schema expectedDepthOne = new Schema( - Lists.newArrayList( - required(12, "list", Types.ListType.ofRequired(13, - Types.StructType.of())))); - Schema actualDepthOne = TypeUtil.project(schema, Sets.newHashSet(13)); - Assert.assertEquals(expectedDepthOne.asStruct(), actualDepthOne.asStruct()); + // Should select everything except 12 and 15 since they are structs with selected elements + Set expectedIds = Sets.newHashSet(10, 11, 35, 13, 14, 16, 17); + Set actualIds = TypeUtil.getProjectedIds(schema); - Schema expectedDepthTwo = new Schema( - Lists.newArrayList( - required(10, "a", Types.IntegerType.get()), - required(12, "list", Types.ListType.ofRequired(13, - Types.StructType.of( - optional(20, "foo", Types.IntegerType.get()), - required(21, "subList", Types.ListType.ofRequired(14, - Types.StructType.of()))))))); - Schema actualDepthTwo = TypeUtil.project(schema, Sets.newHashSet(10, 13, 20, 14)); - Assert.assertEquals(expectedDepthTwo.asStruct(), actualDepthTwo.asStruct()); + Assert.assertEquals(expectedIds, actualIds); } @Test diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 356d909f6bba..524276b57427 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -296,7 +296,7 @@ private Schema lazyColumnProjection() { } requiredFieldIds.addAll(selectedIds); - return TypeUtil.select(schema, requiredFieldIds); + return TypeUtil.project(schema, requiredFieldIds); } else if (context.projectedSchema() != null) { return context.projectedSchema(); diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 57e2c2709137..697a0ca0b6fb 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; import org.apache.avro.SchemaNormalization; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -81,13 +82,21 @@ public Schema record(Schema record, List names, List fields) { Schema fieldSchema = fields.get(field.pos()); // All primitives are selected by selecting the field, but map and list - // types can be selected by projecting the keys, values, or elements. + // types can be selected by projecting the keys, values, or elements. Empty + // Structs can be selected by selecting the record itself instead of it's children. // This creates two conditions where the field should be selected: if the // id is selected or if the result of the field is non-null. The only // case where the converted field is non-null is when a map or list is // selected by lower IDs. if (selectedIds.contains(fieldId)) { - filteredFields.add(copyField(field, field.schema(), fieldId)); + if (isNestedRecord(field) && + (fieldSchema == null || isEmptySchema(field.schema()) || isEmptySchema(fieldSchema))) { + filteredFields.add(copyEmptyField(field, record.getNamespace(), fieldId)); + } else if (fieldSchema != null) { + filteredFields.add(copyField(field, fieldSchema, fieldId)); + } else { + filteredFields.add(copyField(field, field.schema(), fieldId)); + } } else if (fieldSchema != null) { hasChange = true; filteredFields.add(copyField(field, fieldSchema, fieldId)); @@ -105,6 +114,40 @@ public Schema record(Schema record, List names, List fields) { return null; } + private boolean isNestedRecord(Schema.Field field) { + if (AvroSchemaUtil.isOptionSchema(field.schema())) { + Schema optionSchema = AvroSchemaUtil.fromOption(field.schema()); + return optionSchema.getType() == Type.RECORD; + } else { + return field.schema().getType() == Type.RECORD; + } + } + + private boolean isEmptySchema(Schema schema) { + if (AvroSchemaUtil.isOptionSchema(schema)) { + Schema optionSchema = AvroSchemaUtil.fromOption(schema); + if (optionSchema.getType() == Type.RECORD) { + return optionSchema.getFields().isEmpty(); + } + } else if (schema.getType() == Type.RECORD) { + return schema.getFields().isEmpty(); + } + return false; + } + + private Schema.Field copyEmptyField(Schema.Field field, String namespace, Integer fieldId) { + Schema emptyRecordSchema = + Schema.createRecord("r" + fieldId, null, null, false, ImmutableList.of()); + + if (field.schema().isUnion()) { + return copyField(field, AvroSchemaUtil.toOption(emptyRecordSchema), fieldId); + } else if (field.schema().getType() == Type.RECORD) { + return copyField(field, emptyRecordSchema, fieldId); + } else { + throw new IllegalArgumentException(String.format("Cannot make an empty copy of a non Struct type, %s", field)); + } + } + @Override public Schema union(Schema union, List options) { Preconditions.checkState(AvroSchemaUtil.isOptionSchema(union), diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java index 4aa5bfd335ec..4b0bed1cde48 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java @@ -93,7 +93,7 @@ public void testDeleteFields() { Schema del = new SchemaUpdate(SCHEMA, 19).deleteColumn(name).apply(); Assert.assertEquals("Should match projection with '" + name + "' removed", - TypeUtil.select(SCHEMA, selected).asStruct(), del.asStruct()); + TypeUtil.project(SCHEMA, selected).asStruct(), del.asStruct()); } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java index e71034483bdc..bcb70026a0aa 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java @@ -37,10 +37,11 @@ import org.junit.rules.TemporaryFolder; public abstract class TestReadProjection { + protected abstract Record writeAndRead(String desc, - Schema writeSchema, - Schema readSchema, - Record record) throws IOException; + Schema writeSchema, + Schema readSchema, + Record record) throws IOException; @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -526,4 +527,183 @@ public void testListOfStructsProjection() throws IOException { AssertHelpers.assertEmptyAvroField(projectedP2, "y"); Assert.assertNull("Should project null z", projectedP2.get("z")); } + + @Test + public void testEmptyStructProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record location = new Record( + AvroSchemaUtil.fromOption(record.getSchema().getField("location").schema())); + location.put("lat", 52.995143f); + location.put("long", -1.539054f); + record.put("location", location); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "location", Types.StructType.of()) + ); + + Record projected = writeAndRead("empty_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record result = (Record) projected.get("location"); + + Assert.assertEquals("location should be in the 0th position", result, projected.get(0)); + Assert.assertNotNull("Should contain an empty record", result); + AssertHelpers.assertEmptyAvroField(result, "lat"); + AssertHelpers.assertEmptyAvroField(result, "long"); + } + + @Test + public void testEmptyStructRequiredProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record location = new Record(record.getSchema().getField("location").schema()); + location.put("lat", 52.995143f); + location.put("long", -1.539054f); + record.put("location", location); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "location", Types.StructType.of()) + ); + + Record projected = writeAndRead("empty_req_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record result = (Record) projected.get("location"); + Assert.assertEquals("location should be in the 0th position", result, projected.get(0)); + Assert.assertNotNull("Should contain an empty record", result); + AssertHelpers.assertEmptyAvroField(result, "lat"); + AssertHelpers.assertEmptyAvroField(result, "long"); + } + + @Test + public void testRequiredEmptyStructInRequiredStruct() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()), + Types.NestedField.required(4, "empty", Types.StructType.of()) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record location = new Record(record.getSchema().getField("location").schema()); + location.put("lat", 52.995143f); + location.put("long", -1.539054f); + record.put("location", location); + + Schema emptyStruct = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "location", Types.StructType.of( + Types.NestedField.required(4, "empty", Types.StructType.of()) + )) + ); + + Record projected = writeAndRead("req_empty_req_proj", writeSchema, emptyStruct, record); + Assert.assertEquals("Should project id", 34L, projected.get("id")); + Record result = (Record) projected.get("location"); + Assert.assertEquals("location should be in the 1st position", result, projected.get(1)); + Assert.assertNotNull("Should contain an empty record", result); + AssertHelpers.assertEmptyAvroField(result, "lat"); + AssertHelpers.assertEmptyAvroField(result, "long"); + Assert.assertNotNull("Should project empty", result.getSchema().getField("empty")); + Assert.assertNotNull("Empty should not be null", result.get("empty")); + Assert.assertEquals("Empty should be empty", 0, + ((Record) result.get("empty")).getSchema().getFields().size()); + } + + @Test + public void testEmptyNestedStructProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(3, "outer", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.optional(2, "inner", Types.StructType.of( + Types.NestedField.required(5, "lon", Types.FloatType.get()) + ) + ) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record outer = new Record( + AvroSchemaUtil.fromOption(record.getSchema().getField("outer").schema())); + Record inner = new Record(AvroSchemaUtil.fromOption(outer.getSchema().getField("inner").schema())); + inner.put("lon", 32.14f); + outer.put("lat", 52.995143f); + outer.put("inner", inner); + record.put("outer", outer); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "outer", Types.StructType.of( + Types.NestedField.required(2, "inner", Types.StructType.of()) + ))); + + Record projected = writeAndRead("nested_empty_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record outerResult = (Record) projected.get("outer"); + Assert.assertEquals("Outer should be in the 0th position", outerResult, projected.get(0)); + Assert.assertNotNull("Should contain the outer record", outerResult); + AssertHelpers.assertEmptyAvroField(outerResult, "lat"); + Record innerResult = (Record) outerResult.get("inner"); + Assert.assertEquals("Inner should be in the 0th position", innerResult, outerResult.get(0)); + Assert.assertNotNull("Should contain the inner record", innerResult); + AssertHelpers.assertEmptyAvroField(innerResult, "lon"); + } + + @Test + public void testEmptyNestedStructRequiredProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "outer", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "inner", Types.StructType.of( + Types.NestedField.required(5, "lon", Types.FloatType.get()) + ) + ) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record outer = new Record(record.getSchema().getField("outer").schema()); + Record inner = new Record(outer.getSchema().getField("inner").schema()); + inner.put("lon", 32.14f); + outer.put("lat", 52.995143f); + outer.put("inner", inner); + record.put("outer", outer); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "outer", Types.StructType.of( + Types.NestedField.required(2, "inner", Types.StructType.of()) + ))); + + Record projected = writeAndRead("nested_empty_req_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record outerResult = (Record) projected.get("outer"); + Assert.assertEquals("Outer should be in the 0th position", outerResult, projected.get(0)); + Assert.assertNotNull("Should contain the outer record", outerResult); + AssertHelpers.assertEmptyAvroField(outerResult, "lat"); + Record innerResult = (Record) outerResult.get("inner"); + Assert.assertEquals("Inner should be in the 0th position", innerResult, outerResult.get(0)); + Assert.assertNotNull("Should contain the inner record", innerResult); + AssertHelpers.assertEmptyAvroField(innerResult, "lon"); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index acdda78c680b..f963b3446515 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -49,7 +49,12 @@ public Type message(MessageType message, List fields) { Type field = fields.get(i); Integer fieldId = getId(originalField); if (fieldId != null && selectedIds.contains(fieldId)) { - builder.addField(originalField); + if (field != null) { + hasChange = true; + builder.addField(field); + } else { + builder.addField(originalField); + } fieldCount += 1; } else if (field != null) { builder.addField(field); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index e4ca09f1fec8..10b9d6f3030c 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -68,6 +68,8 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -147,6 +149,31 @@ public void testEntriesTable() throws Exception { TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0)); } + @Test + public void testEntriesTablePartitionedPrune() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("status") + .collectAsList(); + + Assert.assertEquals("Results should contain only one status", 1, actual.size()); + Assert.assertEquals("That status should be Added (1)", 1, actual.get(0).getInt(0)); + } + @Test public void testEntriesTableDataFilePrune() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); @@ -312,7 +339,7 @@ public void testCountEntriesTable() { @Test public void testFilesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table entriesTable = loadTable(tableIdentifier, "entries"); Table filesTable = loadTable(tableIdentifier, "files"); @@ -362,7 +389,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { spark.sql("DROP TABLE IF EXISTS parquet_table"); TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); table.updateProperties() .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") .commit(); @@ -422,7 +449,7 @@ public void testEntriesTableWithSnapshotIdInheritance() throws Exception { spark.sql("DROP TABLE IF EXISTS parquet_table"); TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_inheritance_test"); - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + PartitionSpec spec = SPEC; Table table = createTable(tableIdentifier, SCHEMA, spec); table.updateProperties() @@ -523,7 +550,7 @@ public void testFilesUnpartitionedTable() throws Exception { @Test public void testAllMetadataTablesWithStagedCommits() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "stage_aggregate_table_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit(); spark.conf().set("spark.wap.id", "1234567"); @@ -567,7 +594,7 @@ public void testAllMetadataTablesWithStagedCommits() throws Exception { @Test public void testAllDataFilesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table entriesTable = loadTable(tableIdentifier, "entries"); Table filesTable = loadTable(tableIdentifier, "all_data_files"); @@ -831,7 +858,7 @@ public void testPrunedSnapshotsTable() { @Test public void testManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table manifestTable = loadTable(tableIdentifier, "manifests"); Dataset df1 = spark.createDataFrame( Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")), SimpleRecord.class); @@ -878,7 +905,7 @@ public void testManifestsTable() { @Test public void testPruneManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table manifestTable = loadTable(tableIdentifier, "manifests"); Dataset df1 = spark.createDataFrame( Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")), SimpleRecord.class); @@ -938,7 +965,7 @@ public void testPruneManifestsTable() { @Test public void testAllManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table manifestTable = loadTable(tableIdentifier, "all_manifests"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); @@ -1034,7 +1061,7 @@ public void testUnpartitionedPartitionsTable() { @Test public void testPartitionsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table partitionsTable = loadTable(tableIdentifier, "partitions"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); Dataset df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class); From ec0fc4b29a515492753c851688e11dc2885515fc Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Mon, 9 Aug 2021 11:15:04 -0500 Subject: [PATCH 2/7] Remove unceccessary checks I was working on this too late at night. --- .../apache/iceberg/types/GetProjectedIds.java | 4 +- .../apache/iceberg/types/TestTypeUtil.java | 3 +- .../org/apache/iceberg/avro/PruneColumns.java | 40 +------------------ 3 files changed, 5 insertions(+), 42 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java index e5a483aa53ec..0d24b2b73327 100644 --- a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java +++ b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java @@ -39,7 +39,9 @@ public Set struct(Types.StructType struct, List> fieldResu @Override public Set field(Types.NestedField field, Set fieldResult) { - fieldIds.add(field.fieldId()); + if (field.type().isStructType() || field.type().isPrimitiveType()) { + fieldIds.add(field.fieldId()); + } return fieldIds; } diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index 1743c80973a0..bf77b4ff886c 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -338,8 +338,7 @@ public void testGetProjectedIds() { required(17, "C", Types.IntegerType.get())) ))))); - // Should select everything except 12 and 15 since they are structs with selected elements - Set expectedIds = Sets.newHashSet(10, 11, 35, 13, 14, 16, 17); + Set expectedIds = Sets.newHashSet(10, 11, 35, 12, 13, 14, 15, 16, 17); Set actualIds = TypeUtil.getProjectedIds(schema); Assert.assertEquals(expectedIds, actualIds); diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 697a0ca0b6fb..36c2fd75fe82 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -25,7 +25,6 @@ import java.util.Set; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; import org.apache.avro.SchemaNormalization; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -89,10 +88,7 @@ public Schema record(Schema record, List names, List fields) { // case where the converted field is non-null is when a map or list is // selected by lower IDs. if (selectedIds.contains(fieldId)) { - if (isNestedRecord(field) && - (fieldSchema == null || isEmptySchema(field.schema()) || isEmptySchema(fieldSchema))) { - filteredFields.add(copyEmptyField(field, record.getNamespace(), fieldId)); - } else if (fieldSchema != null) { + if (fieldSchema != null) { filteredFields.add(copyField(field, fieldSchema, fieldId)); } else { filteredFields.add(copyField(field, field.schema(), fieldId)); @@ -114,40 +110,6 @@ public Schema record(Schema record, List names, List fields) { return null; } - private boolean isNestedRecord(Schema.Field field) { - if (AvroSchemaUtil.isOptionSchema(field.schema())) { - Schema optionSchema = AvroSchemaUtil.fromOption(field.schema()); - return optionSchema.getType() == Type.RECORD; - } else { - return field.schema().getType() == Type.RECORD; - } - } - - private boolean isEmptySchema(Schema schema) { - if (AvroSchemaUtil.isOptionSchema(schema)) { - Schema optionSchema = AvroSchemaUtil.fromOption(schema); - if (optionSchema.getType() == Type.RECORD) { - return optionSchema.getFields().isEmpty(); - } - } else if (schema.getType() == Type.RECORD) { - return schema.getFields().isEmpty(); - } - return false; - } - - private Schema.Field copyEmptyField(Schema.Field field, String namespace, Integer fieldId) { - Schema emptyRecordSchema = - Schema.createRecord("r" + fieldId, null, null, false, ImmutableList.of()); - - if (field.schema().isUnion()) { - return copyField(field, AvroSchemaUtil.toOption(emptyRecordSchema), fieldId); - } else if (field.schema().getType() == Type.RECORD) { - return copyField(field, emptyRecordSchema, fieldId); - } else { - throw new IllegalArgumentException(String.format("Cannot make an empty copy of a non Struct type, %s", field)); - } - } - @Override public Schema union(Schema union, List options) { Preconditions.checkState(AvroSchemaUtil.isOptionSchema(union), From 6cb5b46b5edd115f9f554672719dda389fc47a63 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 10 Aug 2021 16:44:05 -0500 Subject: [PATCH 3/7] Fix SelectNot The changed behavior of getProjectedIds and "select" means that selectNot needs to be implemented with Project. --- api/src/main/java/org/apache/iceberg/types/TypeUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index e93b10afa818..20b0b3f979f2 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -132,13 +132,13 @@ private static Set getIdsInternal(Type type) { public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { Set projectedIds = getIdsInternal(struct); projectedIds.removeAll(fieldIds); - return select(struct, projectedIds); + return project(struct, projectedIds); } public static Schema selectNot(Schema schema, Set fieldIds) { Set projectedIds = getIdsInternal(schema.asStruct()); projectedIds.removeAll(fieldIds); - return select(schema, projectedIds); + return project(schema, projectedIds); } public static Schema join(Schema left, Schema right) { From b85ba90aca5e21c9c34f024d0fac250c899aa26a Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 7 Sep 2021 11:47:15 -0500 Subject: [PATCH 4/7] Rebase Fix --- api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index bf77b4ff886c..a3da28bc31f4 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -20,8 +20,8 @@ package org.apache.iceberg.types; -import org.apache.iceberg.AssertHelpers; import java.util.Set; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; From 4f4e35a7004064a9ae76b7a98e7ee1b1a2006339 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Thu, 16 Sep 2021 13:50:43 -0500 Subject: [PATCH 5/7] Reviewer Comments Have PruneColumns Avro mimic PruneColumns Iceberg Adjust TypeUtil.selectNot to better mimic the old behavior and added tests --- .../apache/iceberg/types/GetProjectedIds.java | 11 ++++++- .../org/apache/iceberg/types/TypeUtil.java | 10 +++++-- .../apache/iceberg/types/TestTypeUtil.java | 30 +++++++++++++++++++ .../org/apache/iceberg/avro/PruneColumns.java | 30 +++++++++++++++++-- .../iceberg/avro/TestReadProjection.java | 7 ++--- 5 files changed, 78 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java index 0d24b2b73327..985663bf224e 100644 --- a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java +++ b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java @@ -25,8 +25,17 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; class GetProjectedIds extends TypeUtil.SchemaVisitor> { + private final boolean includeStructIds; private final Set fieldIds = Sets.newHashSet(); + GetProjectedIds() { + this(false); + } + + GetProjectedIds(boolean includeStructIds) { + this.includeStructIds = includeStructIds; + } + @Override public Set schema(Schema schema, Set structResult) { return fieldIds; @@ -39,7 +48,7 @@ public Set struct(Types.StructType struct, List> fieldResu @Override public Set field(Types.NestedField field, Set fieldResult) { - if (field.type().isStructType() || field.type().isPrimitiveType()) { + if ((includeStructIds && field.type().isStructType()) || field.type().isPrimitiveType()) { fieldIds.add(field.fieldId()); } return fieldIds; diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index 20b0b3f979f2..2bf16a460e5e 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -125,18 +125,22 @@ public static Set getProjectedIds(Type type) { return ImmutableSet.copyOf(getIdsInternal(type)); } + private static Set getIdsInternal(Type type, boolean includeStructIds) { + return visit(type, new GetProjectedIds(includeStructIds)); + } + private static Set getIdsInternal(Type type) { - return visit(type, new GetProjectedIds()); + return getIdsInternal(type, true); } public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { - Set projectedIds = getIdsInternal(struct); + Set projectedIds = getIdsInternal(struct, false); projectedIds.removeAll(fieldIds); return project(struct, projectedIds); } public static Schema selectNot(Schema schema, Set fieldIds) { - Set projectedIds = getIdsInternal(schema.asStruct()); + Set projectedIds = getIdsInternal(schema.asStruct(), false); projectedIds.removeAll(fieldIds); return project(schema, projectedIds); } diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index a3da28bc31f4..210efd352f5b 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -452,4 +452,34 @@ public void testValidateSchemaViaIndexByName() { TypeUtil.indexByName(Types.StructType.of(nestedType)); } + + @Test + public void testSelectNot() { + Schema schema = new Schema( + Lists.newArrayList( + required(1, "id", Types.LongType.get()), + required(2, "location", Types.StructType.of( + required(3, "lat", Types.DoubleType.get()), + required(4, "long", Types.DoubleType.get()) + )))); + + Schema expectedNoPrimitive = new Schema( + Lists.newArrayList( + required(2, "location", Types.StructType.of( + required(3, "lat", Types.DoubleType.get()), + required(4, "long", Types.DoubleType.get()) + )))); + + Schema actualNoPrimitve = TypeUtil.selectNot(schema, Sets.newHashSet(1)); + Assert.assertEquals(expectedNoPrimitive.asStruct(), actualNoPrimitve.asStruct()); + + // Expected legacy behavior is to completely remove structs if their elements are removed + Schema expectedNoStructElements = new Schema(required(1, "id", Types.LongType.get())); + Schema actualNoStructElements = TypeUtil.selectNot(schema, Sets.newHashSet(3, 4)); + Assert.assertEquals(expectedNoStructElements.asStruct(), actualNoStructElements.asStruct()); + + // Expected legacy behavior is to ignore selectNot on struct elements. + Schema actualNoStruct = TypeUtil.selectNot(schema, Sets.newHashSet(2)); + Assert.assertEquals(schema.asStruct(), actualNoStruct.asStruct()); + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 36c2fd75fe82..660d5129903a 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -19,12 +19,14 @@ package org.apache.iceberg.avro; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; import org.apache.avro.SchemaNormalization; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -82,7 +84,7 @@ public Schema record(Schema record, List names, List fields) { Schema fieldSchema = fields.get(field.pos()); // All primitives are selected by selecting the field, but map and list // types can be selected by projecting the keys, values, or elements. Empty - // Structs can be selected by selecting the record itself instead of it's children. + // Structs can be selected by selecting the record itself instead of its children. // This creates two conditions where the field should be selected: if the // id is selected or if the result of the field is non-null. The only // case where the converted field is non-null is when a map or list is @@ -91,7 +93,11 @@ public Schema record(Schema record, List names, List fields) { if (fieldSchema != null) { filteredFields.add(copyField(field, fieldSchema, fieldId)); } else { - filteredFields.add(copyField(field, field.schema(), fieldId)); + if (isRecord(field.schema())) { + filteredFields.add(copyField(field, makeEmptyCopy(field.schema()), fieldId)); + } else { + filteredFields.add(copyField(field, field.schema(), fieldId)); + } } } else if (fieldSchema != null) { hasChange = true; @@ -264,6 +270,26 @@ private static Schema copyRecord(Schema record, List newFields) { return copy; } + private boolean isRecord(Schema field) { + if (AvroSchemaUtil.isOptionSchema(field)) { + return AvroSchemaUtil.fromOption(field).getType().equals(Type.RECORD); + } else { + return field.getType().equals(Type.RECORD); + } + } + + private static Schema makeEmptyCopy(Schema field) { + if (AvroSchemaUtil.isOptionSchema(field)) { + Schema innerSchema = AvroSchemaUtil.fromOption(field); + Schema emptyRecord = Schema.createRecord(innerSchema.getName(), innerSchema.getDoc(), innerSchema.getNamespace(), + innerSchema.isError(), Collections.emptyList()); + return AvroSchemaUtil.toOption(emptyRecord); + } else { + return Schema.createRecord(field.getName(), field.getDoc(), field.getNamespace(), field.isError(), + Collections.emptyList()); + } + } + private static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) { Schema newSchemaReordered; // if the newSchema is an optional schema, make sure the NULL option is always the first diff --git a/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java index bcb70026a0aa..6c86155a0acd 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java @@ -37,11 +37,10 @@ import org.junit.rules.TemporaryFolder; public abstract class TestReadProjection { - protected abstract Record writeAndRead(String desc, - Schema writeSchema, - Schema readSchema, - Record record) throws IOException; + Schema writeSchema, + Schema readSchema, + Record record) throws IOException; @Rule public TemporaryFolder temp = new TemporaryFolder(); From 2b7da89c261fbc49c2d954c7c5a3f01b36e4d30a Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Thu, 16 Sep 2021 15:44:03 -0500 Subject: [PATCH 6/7] Copy Avro/Iceberg Prune behavior in Parquet --- .../apache/iceberg/parquet/PruneColumns.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index f963b3446515..267f96a886a4 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -19,12 +19,14 @@ package org.apache.iceberg.parquet; +import java.util.Collections; import java.util.List; import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -53,7 +55,11 @@ public Type message(MessageType message, List fields) { hasChange = true; builder.addField(field); } else { - builder.addField(originalField); + if (isStruct(originalField)) { + builder.addField(originalField.asGroupType().withNewFields(Collections.emptyList())); + } else { + builder.addField(originalField); + } } fieldCount += 1; } else if (field != null) { @@ -157,4 +163,15 @@ public Type primitive(PrimitiveType primitive) { private Integer getId(Type type) { return type.getId() == null ? null : type.getId().intValue(); } + + private boolean isStruct(Type field) { + if (field.isPrimitive()) { + return false; + } else { + GroupType groupType = field.asGroupType(); + LogicalTypeAnnotation logicalTypeAnnotation = groupType.getLogicalTypeAnnotation(); + return !logicalTypeAnnotation.equals(LogicalTypeAnnotation.mapType()) && + !logicalTypeAnnotation.equals(LogicalTypeAnnotation.listType()); + } + } } From 38b627d63d8443c312722c6a6a1aabd98b903712 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Mon, 20 Sep 2021 13:21:51 -0500 Subject: [PATCH 7/7] Fix HasChange behavior --- api/src/main/java/org/apache/iceberg/types/TypeUtil.java | 8 ++------ .../main/java/org/apache/iceberg/avro/PruneColumns.java | 4 +++- .../java/org/apache/iceberg/parquet/PruneColumns.java | 3 ++- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index 2bf16a460e5e..80515fdececa 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -115,24 +115,20 @@ public static Types.StructType select(Types.StructType struct, Set fiel } public static Set getProjectedIds(Schema schema) { - return ImmutableSet.copyOf(getIdsInternal(schema.asStruct())); + return ImmutableSet.copyOf(getIdsInternal(schema.asStruct(), true)); } public static Set getProjectedIds(Type type) { if (type.isPrimitiveType()) { return ImmutableSet.of(); } - return ImmutableSet.copyOf(getIdsInternal(type)); + return ImmutableSet.copyOf(getIdsInternal(type, true)); } private static Set getIdsInternal(Type type, boolean includeStructIds) { return visit(type, new GetProjectedIds(includeStructIds)); } - private static Set getIdsInternal(Type type) { - return getIdsInternal(type, true); - } - public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { Set projectedIds = getIdsInternal(struct, false); projectedIds.removeAll(fieldIds); diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 660d5129903a..91089c3b6714 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -91,16 +91,18 @@ public Schema record(Schema record, List names, List fields) { // selected by lower IDs. if (selectedIds.contains(fieldId)) { if (fieldSchema != null) { + hasChange = true; // Sub-fields may be different filteredFields.add(copyField(field, fieldSchema, fieldId)); } else { if (isRecord(field.schema())) { + hasChange = true; // Sub-fields are now empty filteredFields.add(copyField(field, makeEmptyCopy(field.schema()), fieldId)); } else { filteredFields.add(copyField(field, field.schema(), fieldId)); } } } else if (fieldSchema != null) { - hasChange = true; + hasChange = true; // Sub-fields may be different filteredFields.add(copyField(field, fieldSchema, fieldId)); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index 267f96a886a4..843aafb2151f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -56,6 +56,7 @@ public Type message(MessageType message, List fields) { builder.addField(field); } else { if (isStruct(originalField)) { + hasChange = true; builder.addField(originalField.asGroupType().withNewFields(Collections.emptyList())); } else { builder.addField(originalField); @@ -63,9 +64,9 @@ public Type message(MessageType message, List fields) { } fieldCount += 1; } else if (field != null) { + hasChange = true; builder.addField(field); fieldCount += 1; - hasChange = true; } }