diff --git a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java index d50c45e22e6b..985663bf224e 100644 --- a/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java +++ b/api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java @@ -25,8 +25,17 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; class GetProjectedIds extends TypeUtil.SchemaVisitor> { + private final boolean includeStructIds; private final Set fieldIds = Sets.newHashSet(); + GetProjectedIds() { + this(false); + } + + GetProjectedIds(boolean includeStructIds) { + this.includeStructIds = includeStructIds; + } + @Override public Set schema(Schema schema, Set structResult) { return fieldIds; @@ -39,7 +48,7 @@ public Set struct(Types.StructType struct, List> fieldResu @Override public Set field(Types.NestedField field, Set fieldResult) { - if (fieldResult == null) { + if ((includeStructIds && field.type().isStructType()) || field.type().isPrimitiveType()) { fieldIds.add(field.fieldId()); } return fieldIds; diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index e93b10afa818..80515fdececa 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -115,30 +115,30 @@ public static Types.StructType select(Types.StructType struct, Set fiel } public static Set getProjectedIds(Schema schema) { - return ImmutableSet.copyOf(getIdsInternal(schema.asStruct())); + return ImmutableSet.copyOf(getIdsInternal(schema.asStruct(), true)); } public static Set getProjectedIds(Type type) { if (type.isPrimitiveType()) { return ImmutableSet.of(); } - return ImmutableSet.copyOf(getIdsInternal(type)); + return ImmutableSet.copyOf(getIdsInternal(type, true)); } - private static Set getIdsInternal(Type type) { - return visit(type, new GetProjectedIds()); + private static Set getIdsInternal(Type type, boolean includeStructIds) { + return visit(type, new GetProjectedIds(includeStructIds)); } public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { - Set projectedIds = getIdsInternal(struct); + Set projectedIds = getIdsInternal(struct, false); projectedIds.removeAll(fieldIds); - return select(struct, projectedIds); + return project(struct, projectedIds); } public static Schema selectNot(Schema schema, Set fieldIds) { - Set projectedIds = getIdsInternal(schema.asStruct()); + Set projectedIds = getIdsInternal(schema.asStruct(), false); projectedIds.removeAll(fieldIds); - return select(schema, projectedIds); + return project(schema, projectedIds); } public static Schema join(Schema left, Schema right) { diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index be05b0fe2db5..1bc10ac3755c 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -42,7 +42,7 @@ public class StructProjection implements StructLike { */ public static StructProjection create(Schema schema, Set ids) { StructType structType = schema.asStruct(); - return new StructProjection(structType, TypeUtil.select(structType, ids)); + return new StructProjection(structType, TypeUtil.project(structType, ids)); } /** diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index f9d9ef13e565..210efd352f5b 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -20,6 +20,7 @@ package org.apache.iceberg.types; +import java.util.Set; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -323,48 +324,24 @@ public void testProjectMap() { } @Test - public void testProjectList() { + public void testGetProjectedIds() { Schema schema = new Schema( Lists.newArrayList( required(10, "a", Types.IntegerType.get()), required(11, "A", Types.IntegerType.get()), - required(12, "list", Types.ListType.ofRequired(13, - Types.StructType.of( - optional(20, "foo", Types.IntegerType.get()), - required(21, "subList", Types.ListType.ofRequired(14, - Types.StructType.of( - required(15, "x", Types.IntegerType.get()), - required(16, "y", Types.IntegerType.get()), - required(17, "z", Types.IntegerType.get()))))))))); - - - AssertHelpers.assertThrows("Cannot explicitly project List", - IllegalArgumentException.class, - () -> TypeUtil.project(schema, Sets.newHashSet(12)) - ); - - AssertHelpers.assertThrows("Cannot explicitly project List", - IllegalArgumentException.class, - () -> TypeUtil.project(schema, Sets.newHashSet(21)) - ); + required(35, "emptyStruct", Types.StructType.of()), + required(12, "someStruct", Types.StructType.of( + required(13, "b", Types.IntegerType.get()), + required(14, "B", Types.IntegerType.get()), + required(15, "anotherStruct", Types.StructType.of( + required(16, "c", Types.IntegerType.get()), + required(17, "C", Types.IntegerType.get())) + ))))); - Schema expectedDepthOne = new Schema( - Lists.newArrayList( - required(12, "list", Types.ListType.ofRequired(13, - Types.StructType.of())))); - Schema actualDepthOne = TypeUtil.project(schema, Sets.newHashSet(13)); - Assert.assertEquals(expectedDepthOne.asStruct(), actualDepthOne.asStruct()); + Set expectedIds = Sets.newHashSet(10, 11, 35, 12, 13, 14, 15, 16, 17); + Set actualIds = TypeUtil.getProjectedIds(schema); - Schema expectedDepthTwo = new Schema( - Lists.newArrayList( - required(10, "a", Types.IntegerType.get()), - required(12, "list", Types.ListType.ofRequired(13, - Types.StructType.of( - optional(20, "foo", Types.IntegerType.get()), - required(21, "subList", Types.ListType.ofRequired(14, - Types.StructType.of()))))))); - Schema actualDepthTwo = TypeUtil.project(schema, Sets.newHashSet(10, 13, 20, 14)); - Assert.assertEquals(expectedDepthTwo.asStruct(), actualDepthTwo.asStruct()); + Assert.assertEquals(expectedIds, actualIds); } @Test @@ -475,4 +452,34 @@ public void testValidateSchemaViaIndexByName() { TypeUtil.indexByName(Types.StructType.of(nestedType)); } + + @Test + public void testSelectNot() { + Schema schema = new Schema( + Lists.newArrayList( + required(1, "id", Types.LongType.get()), + required(2, "location", Types.StructType.of( + required(3, "lat", Types.DoubleType.get()), + required(4, "long", Types.DoubleType.get()) + )))); + + Schema expectedNoPrimitive = new Schema( + Lists.newArrayList( + required(2, "location", Types.StructType.of( + required(3, "lat", Types.DoubleType.get()), + required(4, "long", Types.DoubleType.get()) + )))); + + Schema actualNoPrimitve = TypeUtil.selectNot(schema, Sets.newHashSet(1)); + Assert.assertEquals(expectedNoPrimitive.asStruct(), actualNoPrimitve.asStruct()); + + // Expected legacy behavior is to completely remove structs if their elements are removed + Schema expectedNoStructElements = new Schema(required(1, "id", Types.LongType.get())); + Schema actualNoStructElements = TypeUtil.selectNot(schema, Sets.newHashSet(3, 4)); + Assert.assertEquals(expectedNoStructElements.asStruct(), actualNoStructElements.asStruct()); + + // Expected legacy behavior is to ignore selectNot on struct elements. + Schema actualNoStruct = TypeUtil.selectNot(schema, Sets.newHashSet(2)); + Assert.assertEquals(schema.asStruct(), actualNoStruct.asStruct()); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 356d909f6bba..524276b57427 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -296,7 +296,7 @@ private Schema lazyColumnProjection() { } requiredFieldIds.addAll(selectedIds); - return TypeUtil.select(schema, requiredFieldIds); + return TypeUtil.project(schema, requiredFieldIds); } else if (context.projectedSchema() != null) { return context.projectedSchema(); diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 57e2c2709137..91089c3b6714 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -19,12 +19,14 @@ package org.apache.iceberg.avro; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; import org.apache.avro.SchemaNormalization; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -81,15 +83,26 @@ public Schema record(Schema record, List names, List fields) { Schema fieldSchema = fields.get(field.pos()); // All primitives are selected by selecting the field, but map and list - // types can be selected by projecting the keys, values, or elements. + // types can be selected by projecting the keys, values, or elements. Empty + // Structs can be selected by selecting the record itself instead of its children. // This creates two conditions where the field should be selected: if the // id is selected or if the result of the field is non-null. The only // case where the converted field is non-null is when a map or list is // selected by lower IDs. if (selectedIds.contains(fieldId)) { - filteredFields.add(copyField(field, field.schema(), fieldId)); + if (fieldSchema != null) { + hasChange = true; // Sub-fields may be different + filteredFields.add(copyField(field, fieldSchema, fieldId)); + } else { + if (isRecord(field.schema())) { + hasChange = true; // Sub-fields are now empty + filteredFields.add(copyField(field, makeEmptyCopy(field.schema()), fieldId)); + } else { + filteredFields.add(copyField(field, field.schema(), fieldId)); + } + } } else if (fieldSchema != null) { - hasChange = true; + hasChange = true; // Sub-fields may be different filteredFields.add(copyField(field, fieldSchema, fieldId)); } } @@ -259,6 +272,26 @@ private static Schema copyRecord(Schema record, List newFields) { return copy; } + private boolean isRecord(Schema field) { + if (AvroSchemaUtil.isOptionSchema(field)) { + return AvroSchemaUtil.fromOption(field).getType().equals(Type.RECORD); + } else { + return field.getType().equals(Type.RECORD); + } + } + + private static Schema makeEmptyCopy(Schema field) { + if (AvroSchemaUtil.isOptionSchema(field)) { + Schema innerSchema = AvroSchemaUtil.fromOption(field); + Schema emptyRecord = Schema.createRecord(innerSchema.getName(), innerSchema.getDoc(), innerSchema.getNamespace(), + innerSchema.isError(), Collections.emptyList()); + return AvroSchemaUtil.toOption(emptyRecord); + } else { + return Schema.createRecord(field.getName(), field.getDoc(), field.getNamespace(), field.isError(), + Collections.emptyList()); + } + } + private static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) { Schema newSchemaReordered; // if the newSchema is an optional schema, make sure the NULL option is always the first diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java index 4aa5bfd335ec..4b0bed1cde48 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java @@ -93,7 +93,7 @@ public void testDeleteFields() { Schema del = new SchemaUpdate(SCHEMA, 19).deleteColumn(name).apply(); Assert.assertEquals("Should match projection with '" + name + "' removed", - TypeUtil.select(SCHEMA, selected).asStruct(), del.asStruct()); + TypeUtil.project(SCHEMA, selected).asStruct(), del.asStruct()); } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java index e71034483bdc..6c86155a0acd 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java @@ -526,4 +526,183 @@ public void testListOfStructsProjection() throws IOException { AssertHelpers.assertEmptyAvroField(projectedP2, "y"); Assert.assertNull("Should project null z", projectedP2.get("z")); } + + @Test + public void testEmptyStructProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record location = new Record( + AvroSchemaUtil.fromOption(record.getSchema().getField("location").schema())); + location.put("lat", 52.995143f); + location.put("long", -1.539054f); + record.put("location", location); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "location", Types.StructType.of()) + ); + + Record projected = writeAndRead("empty_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record result = (Record) projected.get("location"); + + Assert.assertEquals("location should be in the 0th position", result, projected.get(0)); + Assert.assertNotNull("Should contain an empty record", result); + AssertHelpers.assertEmptyAvroField(result, "lat"); + AssertHelpers.assertEmptyAvroField(result, "long"); + } + + @Test + public void testEmptyStructRequiredProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record location = new Record(record.getSchema().getField("location").schema()); + location.put("lat", 52.995143f); + location.put("long", -1.539054f); + record.put("location", location); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "location", Types.StructType.of()) + ); + + Record projected = writeAndRead("empty_req_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record result = (Record) projected.get("location"); + Assert.assertEquals("location should be in the 0th position", result, projected.get(0)); + Assert.assertNotNull("Should contain an empty record", result); + AssertHelpers.assertEmptyAvroField(result, "lat"); + AssertHelpers.assertEmptyAvroField(result, "long"); + } + + @Test + public void testRequiredEmptyStructInRequiredStruct() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()), + Types.NestedField.required(4, "empty", Types.StructType.of()) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record location = new Record(record.getSchema().getField("location").schema()); + location.put("lat", 52.995143f); + location.put("long", -1.539054f); + record.put("location", location); + + Schema emptyStruct = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "location", Types.StructType.of( + Types.NestedField.required(4, "empty", Types.StructType.of()) + )) + ); + + Record projected = writeAndRead("req_empty_req_proj", writeSchema, emptyStruct, record); + Assert.assertEquals("Should project id", 34L, projected.get("id")); + Record result = (Record) projected.get("location"); + Assert.assertEquals("location should be in the 1st position", result, projected.get(1)); + Assert.assertNotNull("Should contain an empty record", result); + AssertHelpers.assertEmptyAvroField(result, "lat"); + AssertHelpers.assertEmptyAvroField(result, "long"); + Assert.assertNotNull("Should project empty", result.getSchema().getField("empty")); + Assert.assertNotNull("Empty should not be null", result.get("empty")); + Assert.assertEquals("Empty should be empty", 0, + ((Record) result.get("empty")).getSchema().getFields().size()); + } + + @Test + public void testEmptyNestedStructProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(3, "outer", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.optional(2, "inner", Types.StructType.of( + Types.NestedField.required(5, "lon", Types.FloatType.get()) + ) + ) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record outer = new Record( + AvroSchemaUtil.fromOption(record.getSchema().getField("outer").schema())); + Record inner = new Record(AvroSchemaUtil.fromOption(outer.getSchema().getField("inner").schema())); + inner.put("lon", 32.14f); + outer.put("lat", 52.995143f); + outer.put("inner", inner); + record.put("outer", outer); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "outer", Types.StructType.of( + Types.NestedField.required(2, "inner", Types.StructType.of()) + ))); + + Record projected = writeAndRead("nested_empty_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record outerResult = (Record) projected.get("outer"); + Assert.assertEquals("Outer should be in the 0th position", outerResult, projected.get(0)); + Assert.assertNotNull("Should contain the outer record", outerResult); + AssertHelpers.assertEmptyAvroField(outerResult, "lat"); + Record innerResult = (Record) outerResult.get("inner"); + Assert.assertEquals("Inner should be in the 0th position", innerResult, outerResult.get(0)); + Assert.assertNotNull("Should contain the inner record", innerResult); + AssertHelpers.assertEmptyAvroField(innerResult, "lon"); + } + + @Test + public void testEmptyNestedStructRequiredProjection() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(3, "outer", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "inner", Types.StructType.of( + Types.NestedField.required(5, "lon", Types.FloatType.get()) + ) + ) + )) + ); + + Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("id", 34L); + Record outer = new Record(record.getSchema().getField("outer").schema()); + Record inner = new Record(outer.getSchema().getField("inner").schema()); + inner.put("lon", 32.14f); + outer.put("lat", 52.995143f); + outer.put("inner", inner); + record.put("outer", outer); + + Schema emptyStruct = new Schema( + Types.NestedField.required(3, "outer", Types.StructType.of( + Types.NestedField.required(2, "inner", Types.StructType.of()) + ))); + + Record projected = writeAndRead("nested_empty_req_proj", writeSchema, emptyStruct, record); + AssertHelpers.assertEmptyAvroField(projected, "id"); + Record outerResult = (Record) projected.get("outer"); + Assert.assertEquals("Outer should be in the 0th position", outerResult, projected.get(0)); + Assert.assertNotNull("Should contain the outer record", outerResult); + AssertHelpers.assertEmptyAvroField(outerResult, "lat"); + Record innerResult = (Record) outerResult.get("inner"); + Assert.assertEquals("Inner should be in the 0th position", innerResult, outerResult.get(0)); + Assert.assertNotNull("Should contain the inner record", innerResult); + AssertHelpers.assertEmptyAvroField(innerResult, "lon"); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index acdda78c680b..843aafb2151f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -19,12 +19,14 @@ package org.apache.iceberg.parquet; +import java.util.Collections; import java.util.List; import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -49,12 +51,22 @@ public Type message(MessageType message, List fields) { Type field = fields.get(i); Integer fieldId = getId(originalField); if (fieldId != null && selectedIds.contains(fieldId)) { - builder.addField(originalField); + if (field != null) { + hasChange = true; + builder.addField(field); + } else { + if (isStruct(originalField)) { + hasChange = true; + builder.addField(originalField.asGroupType().withNewFields(Collections.emptyList())); + } else { + builder.addField(originalField); + } + } fieldCount += 1; } else if (field != null) { + hasChange = true; builder.addField(field); fieldCount += 1; - hasChange = true; } } @@ -152,4 +164,15 @@ public Type primitive(PrimitiveType primitive) { private Integer getId(Type type) { return type.getId() == null ? null : type.getId().intValue(); } + + private boolean isStruct(Type field) { + if (field.isPrimitive()) { + return false; + } else { + GroupType groupType = field.asGroupType(); + LogicalTypeAnnotation logicalTypeAnnotation = groupType.getLogicalTypeAnnotation(); + return !logicalTypeAnnotation.equals(LogicalTypeAnnotation.mapType()) && + !logicalTypeAnnotation.equals(LogicalTypeAnnotation.listType()); + } + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index e4ca09f1fec8..10b9d6f3030c 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -68,6 +68,8 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -147,6 +149,31 @@ public void testEntriesTable() throws Exception { TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0)); } + @Test + public void testEntriesTablePartitionedPrune() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); + + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("status") + .collectAsList(); + + Assert.assertEquals("Results should contain only one status", 1, actual.size()); + Assert.assertEquals("That status should be Added (1)", 1, actual.get(0).getInt(0)); + } + @Test public void testEntriesTableDataFilePrune() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test"); @@ -312,7 +339,7 @@ public void testCountEntriesTable() { @Test public void testFilesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table entriesTable = loadTable(tableIdentifier, "entries"); Table filesTable = loadTable(tableIdentifier, "files"); @@ -362,7 +389,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { spark.sql("DROP TABLE IF EXISTS parquet_table"); TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); table.updateProperties() .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") .commit(); @@ -422,7 +449,7 @@ public void testEntriesTableWithSnapshotIdInheritance() throws Exception { spark.sql("DROP TABLE IF EXISTS parquet_table"); TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_inheritance_test"); - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + PartitionSpec spec = SPEC; Table table = createTable(tableIdentifier, SCHEMA, spec); table.updateProperties() @@ -523,7 +550,7 @@ public void testFilesUnpartitionedTable() throws Exception { @Test public void testAllMetadataTablesWithStagedCommits() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "stage_aggregate_table_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit(); spark.conf().set("spark.wap.id", "1234567"); @@ -567,7 +594,7 @@ public void testAllMetadataTablesWithStagedCommits() throws Exception { @Test public void testAllDataFilesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table entriesTable = loadTable(tableIdentifier, "entries"); Table filesTable = loadTable(tableIdentifier, "all_data_files"); @@ -831,7 +858,7 @@ public void testPrunedSnapshotsTable() { @Test public void testManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table manifestTable = loadTable(tableIdentifier, "manifests"); Dataset df1 = spark.createDataFrame( Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")), SimpleRecord.class); @@ -878,7 +905,7 @@ public void testManifestsTable() { @Test public void testPruneManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table manifestTable = loadTable(tableIdentifier, "manifests"); Dataset df1 = spark.createDataFrame( Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")), SimpleRecord.class); @@ -938,7 +965,7 @@ public void testPruneManifestsTable() { @Test public void testAllManifestsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table manifestTable = loadTable(tableIdentifier, "all_manifests"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); @@ -1034,7 +1061,7 @@ public void testUnpartitionedPartitionsTable() { @Test public void testPartitionsTable() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test"); - Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table partitionsTable = loadTable(tableIdentifier, "partitions"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); Dataset df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);