diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 532519e5921c..3349ab2de6e8 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -41,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; /** @@ -703,6 +705,35 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } + private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) { + if (formatVersion > 1) { + Map, Integer> transformToFieldId = specs.stream() + .flatMap(spec -> spec.fields().stream()) + .collect(Collectors.toMap( + field -> Pair.of(field.sourceId(), field.transform().toString()), + PartitionField::fieldId, + (n1, n2) -> n2)); + + PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema()) + .withSpecId(partitionSpec.specId()); + + for (PartitionField field : partitionSpec.fields()) { + // reassign the partition field ids + int fieldId = transformToFieldId.computeIfAbsent( + Pair.of(field.sourceId(), field.transform().toString()), k -> nextID.get()); + specBuilder.add( + field.sourceId(), + fieldId, + field.name(), + field.transform()); + } + return specBuilder.build(); + } else { + // noop for v1 table + return partitionSpec; + } + } + // The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec, SortOrder updatedSortOrder, String newLocation, @@ -718,7 +749,11 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update int nextSpecId = maxSpecId.orElse(TableMetadata.INITIAL_SPEC_ID) + 1; // rebuild the partition spec using the new column ids - PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec); + PartitionSpec newSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec); + + // reassign partition field ids with existing partition specs in the table + AtomicInteger lastPartitionId = new AtomicInteger(lastAssignedPartitionId); + PartitionSpec freshSpec = reassignPartitionIds(newSpec, lastPartitionId::incrementAndGet); // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. int specId = specs.stream() diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index c44ca63a126a..a392662ca939 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -593,6 +593,74 @@ public void testInvalidUpdatePartitionSpecForV1Table() throws Exception { () -> metadata.updatePartitionSpec(spec)); } + @Test + public void testBuildReplacementForV1Table() { + Schema schema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0) + .identity("x") + .identity("y") + .build(); + String location = "file://tmp/db/table"; + TableMetadata metadata = TableMetadata.newTableMetadata( + schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 1); + Assert.assertEquals(spec, metadata.spec()); + + Schema updatedSchema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "z", Types.StringType.get()) + ); + PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0) + .bucket("z", 8) + .identity("x") + .build(); + TableMetadata updated = metadata.buildReplacement( + updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of()); + PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1) + .add(3, 1000, "z_bucket", "bucket[8]") + .add(1, 1001, "x", "identity") + .build(); + Assert.assertEquals( + "Should reassign the partition field IDs and reuse any existing IDs for equivalent fields", + expected, updated.spec()); + } + + @Test + public void testBuildReplacementForV2Table() { + Schema schema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0) + .identity("x") + .identity("y") + .build(); + String location = "file://tmp/db/table"; + TableMetadata metadata = TableMetadata.newTableMetadata( + schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 2); + Assert.assertEquals(spec, metadata.spec()); + + Schema updatedSchema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "z", Types.StringType.get()) + ); + PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0) + .bucket("z", 8) + .identity("x") + .build(); + TableMetadata updated = metadata.buildReplacement( + updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of()); + PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1) + .add(3, 1002, "z_bucket", "bucket[8]") + .add(1, 1000, "x", "identity") + .build(); + Assert.assertEquals( + "Should reassign the partition field IDs and reuse any existing IDs for equivalent fields", + expected, updated.spec()); + } + @Test public void testSortOrder() { Schema schema = new Schema(