diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index a6a55a5fae7a..0181b98ac8ea 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -108,6 +108,35 @@ private int assignFieldId() { return lastAssignedPartitionId; } + /** + * In V2 it searches for a similar partition field in historical partition specs. Tries to match on source field + * ID, transform type and target name (optional). If not found or in V1 cases it creates a new PartitionField. + * @param sourceTransform pair of source ID and transform for this PartitionField addition + * @param name target partition field name, if specified + * @return the recycled or newly created partition field + */ + private PartitionField recycleOrCreatePartitionField(Pair> sourceTransform, String name) { + if (formatVersion == 2 && base != null) { + int sourceId = sourceTransform.first(); + Transform transform = sourceTransform.second(); + + Set allHistoricalFields = Sets.newHashSet(); + for (PartitionSpec partitionSpec : base.specs()) { + allHistoricalFields.addAll(partitionSpec.fields()); + } + + for (PartitionField field : allHistoricalFields) { + if (field.sourceId() == sourceId && field.transform().equals(transform)) { + // if target name is specified then consider it too, otherwise not + if (name == null || field.name().equals(name)) { + return field; + } + } + } + } + return new PartitionField(sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); + } + @Override public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) { this.caseSensitive = isCaseSensitive; @@ -157,8 +186,7 @@ public BaseUpdatePartitionSpec addField(String name, Term term) { Preconditions.checkArgument(added == null, "Cannot add duplicate partition field %s=%s, already added: %s", name, term, added); - PartitionField newField = new PartitionField( - sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); + PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name); if (newField.name() == null) { String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java b/core/src/test/java/org/apache/iceberg/ScanTestBase.java index a800e4032997..72136477da33 100644 --- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.Executors; @@ -28,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.types.Types; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -133,4 +135,63 @@ public void testTableScanWithPlanExecutor() { Assert.assertEquals(2, Iterables.size(scan.planFiles())); Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); } + + @Test + public void testReAddingPartitionField() throws Exception { + Assume.assumeTrue(formatVersion == 2); + Schema schema = new Schema( + required(1, "a", Types.IntegerType.get()), + required(2, "b", Types.StringType.get()), + required(3, "data", Types.IntegerType.get()) + ); + PartitionSpec initialSpec = PartitionSpec.builderFor(schema).identity("a").build(); + File dir = temp.newFolder(); + dir.delete(); + this.table = TestTables.create(dir, "test_part_evolution", schema, initialSpec, formatVersion); + table.newFastAppend().appendFile(DataFiles.builder(initialSpec) + .withPath("/path/to/data/a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("a=1") + .withRecordCount(1) + .build()).commit(); + + table.updateSpec().addField("b").removeField("a").commit(); + table.newFastAppend().appendFile(DataFiles.builder(table.spec()) + .withPath("/path/to/data/b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("b=1") + .withRecordCount(1) + .build()).commit(); + + table.updateSpec().addField("a").commit(); + table.newFastAppend().appendFile(DataFiles.builder(table.spec()) + .withPath("/path/to/data/ab.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("b=1/a=1") + .withRecordCount(1) + .build()).commit(); + + table.newFastAppend().appendFile(DataFiles.builder(table.spec()) + .withPath("/path/to/data/a2b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("b=1/a=2") + .withRecordCount(1) + .build()).commit(); + + TableScan scan1 = table.newScan().filter(Expressions.equal("b", "1")); + try (CloseableIterable tasks = scan1.planTasks()) { + Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1); + for (CombinedScanTask combinedScanTask : tasks) { + Assert.assertEquals("All 4 files should match b=1 filter", 4, combinedScanTask.files().size()); + } + } + + TableScan scan2 = table.newScan().filter(Expressions.equal("a", 2)); + try (CloseableIterable tasks = scan2.planTasks()) { + Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1); + for (CombinedScanTask combinedScanTask : tasks) { + Assert.assertEquals("a=2 and file without a in spec should match", 2, combinedScanTask.files().size()); + } + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index ee00bb8d801f..a357487ea1ba 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -514,7 +514,11 @@ public void testFilesTableScanWithDroppedPartition() throws IOException { table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.refresh(); - table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + // Here we need to specify target name as 'data_bucket_16'. If unspecified a default name will be generated. As per + // https://github.com/apache/iceberg/pull/4868 there's an inconsistency of doing this: in V2, the above removed + // data_bucket would be recycled in favor of data_bucket_16. By specifying the target name, we explicitly require + // data_bucket not to be recycled. + table.updateSpec().addField("data_bucket_16", Expressions.bucket("data", 16)).commit(); table.refresh(); table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java index 7519486bc5c7..691e9f6f5481 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java @@ -482,11 +482,6 @@ public void testPartitionsTableRenameFields() throws ParseException { @Test public void testPartitionsTableSwitchFields() throws Exception { - // Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292 - // In V1, dropped partition fields show separately when field is re-added - // In V2, re-added field currently conflicts with its deleted form - Assume.assumeTrue(formatVersion == 1); - sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); initTable(); Table table = validationCatalog.loadTable(tableIdent); @@ -531,17 +526,34 @@ public void testPartitionsTableSwitchFields() throws Exception { sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd3')", tableName); - assertPartitions( - ImmutableList.of( - row(null, "c1", null), - row(null, "c1", "d1"), - row(null, "c2", null), - row(null, "c2", "d2"), - row("d1", "c1", null), - row("d2", "c2", null)), - "STRUCT", - PARTITIONS); + if (formatVersion == 1) { + assertPartitions( + ImmutableList.of( + row(null, "c1", null), + row(null, "c1", "d1"), + row(null, "c2", null), + row(null, "c2", "d2"), + row(null, "c3", "d3"), + row("d1", "c1", null), + row("d2", "c2", null)), + "STRUCT", + PARTITIONS); + } else { + // In V2 re-adding a former partition field that was part of an older spec will not change its name or its + // field ID either, thus values will be collapsed into a single common column (as opposed to V1 where any new + // partition field addition will result in a new column in this metadata table) + assertPartitions( + ImmutableList.of( + row(null, "c1"), + row(null, "c2"), + row("d1", "c1"), + row("d2", "c2"), + row("d3", "c3")), + "STRUCT", + PARTITIONS); + } } @Test