diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 5703cc4ad4d1..7c7ae5bf1d9f 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -40,7 +41,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; /** @@ -704,6 +707,57 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } + private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) { + PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema()) + .withSpecId(partitionSpec.specId()); + + if (formatVersion > 1) { + // for v2 and later, reuse any existing field IDs, but reproduce the same spec + Map, Integer> transformToFieldId = specs.stream() + .flatMap(spec -> spec.fields().stream()) + .collect(Collectors.toMap( + field -> Pair.of(field.sourceId(), field.transform().toString()), + PartitionField::fieldId, + Math::max)); + + for (PartitionField field : partitionSpec.fields()) { + // reassign the partition field ids + int partitionFieldId = transformToFieldId.computeIfAbsent( + Pair.of(field.sourceId(), field.transform().toString()), k -> nextID.get()); + specBuilder.add( + field.sourceId(), + partitionFieldId, + field.name(), + field.transform()); + } + + } else { + // for v1, preserve the existing spec and carry forward all fields, replacing missing fields with void + Map, PartitionField> newFields = Maps.newLinkedHashMap(); + for (PartitionField newField : partitionSpec.fields()) { + newFields.put(Pair.of(newField.sourceId(), newField.transform().toString()), newField); + } + + for (PartitionField field : spec().fields()) { + // ensure each field is either carried forward or replaced with void + PartitionField newField = newFields.remove(Pair.of(field.sourceId(), field.transform().toString())); + if (newField != null) { + // copy the new field with the existing field ID + specBuilder.add(newField.sourceId(), field.fieldId(), newField.name(), newField.transform()); + } else { + specBuilder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull()); + } + } + + // add any remaining new fields at the end and assign new partition field IDs + for (PartitionField newField : newFields.values()) { + specBuilder.add(newField.sourceId(), nextID.get(), newField.name(), newField.transform()); + } + } + + return specBuilder.build(); + } + // The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec, SortOrder updatedSortOrder, String newLocation, @@ -721,16 +775,20 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update // rebuild the partition spec using the new column ids PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec); + // reassign partition field ids with existing partition specs in the table + AtomicInteger lastPartitionId = new AtomicInteger(lastAssignedPartitionId); + PartitionSpec newSpec = reassignPartitionIds(freshSpec, lastPartitionId::incrementAndGet); + // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. int specId = specs.stream() - .filter(freshSpec::compatibleWith) + .filter(newSpec::compatibleWith) .findFirst() .map(PartitionSpec::specId) .orElse(nextSpecId); ImmutableList.Builder specListBuilder = ImmutableList.builder().addAll(specs); if (!specsById.containsKey(specId)) { - specListBuilder.add(freshSpec); + specListBuilder.add(newSpec); } // determine the next order id @@ -766,7 +824,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update return new TableMetadata(null, formatVersion, uuid, newLocation, lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(), - specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, freshSpec.lastAssignedFieldId()), + specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId()), orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties)); } @@ -826,7 +884,7 @@ private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec parti // add all of the fields to the builder. IDs should not change. for (PartitionField field : partitionSpec.fields()) { - specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform().toString()); + specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); } return specBuilder.build(); diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java index b822716fa949..e79e2975497f 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java @@ -32,6 +32,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -79,8 +80,18 @@ public void testReplaceTransactionWithCustomSortOrder() { Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot()); Assert.assertEquals("Schema should match previous schema", schema.asStruct(), table.schema().asStruct()); - Assert.assertEquals("Partition spec should have no fields", - 0, table.spec().fields().size()); + + PartitionSpec v2Expected = PartitionSpec.builderFor(table.schema()).withSpecId(1).build(); + V2Assert.assertEquals("Table should have an unpartitioned spec", + v2Expected, table.spec()); + + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("data", "data_bucket") + .withSpecId(1) + .build(); + V1Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); + Assert.assertEquals("Table should have 2 orders", 2, table.sortOrders().size()); SortOrder sortOrder = table.sortOrder(); Assert.assertEquals("Order ID must match", 1, sortOrder.orderId()); @@ -117,8 +128,18 @@ public void testReplaceTransaction() { Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot()); Assert.assertEquals("Schema should match previous schema", schema.asStruct(), table.schema().asStruct()); - Assert.assertEquals("Partition spec should have no fields", - 0, table.spec().fields().size()); + + PartitionSpec v2Expected = PartitionSpec.builderFor(table.schema()).withSpecId(1).build(); + V2Assert.assertEquals("Table should have an unpartitioned spec", + v2Expected, table.spec()); + + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("data", "data_bucket") + .withSpecId(1) + .build(); + V1Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); + Assert.assertEquals("Table should have 1 order", 1, table.sortOrders().size()); Assert.assertEquals("Table order ID should match", 0, table.sortOrder().orderId()); Assert.assertTrue("Table should be unsorted", table.sortOrder().isUnsorted()); @@ -126,6 +147,8 @@ public void testReplaceTransaction() { @Test public void testReplaceWithIncompatibleSchemaUpdate() { + Assume.assumeTrue("Fails early for v1 tables because partition spec cannot drop a field", formatVersion == 2); + Schema newSchema = new Schema( required(4, "obj_id", Types.IntegerType.get())); @@ -175,8 +198,17 @@ public void testReplaceWithNewPartitionSpec() { Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot()); Assert.assertEquals("Schema should use new schema, not compatible with previous", schema.asStruct(), table.schema().asStruct()); - Assert.assertEquals("Table should have new unpartitioned spec", - 0, table.spec().fields().size()); + + PartitionSpec v2Expected = PartitionSpec.builderFor(table.schema()).withSpecId(1).build(); + V2Assert.assertEquals("Table should have an unpartitioned spec", + v2Expected, table.spec()); + + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("data", "data_bucket") + .withSpecId(1) + .build(); + V1Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); } @Test diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index d3cebec1f328..0be6b0adf5b8 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -600,6 +600,76 @@ public void testInvalidUpdatePartitionSpecForV1Table() throws Exception { () -> metadata.updatePartitionSpec(spec)); } + @Test + public void testBuildReplacementForV1Table() { + Schema schema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0) + .identity("x") + .identity("y") + .build(); + String location = "file://tmp/db/table"; + TableMetadata metadata = TableMetadata.newTableMetadata( + schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 1); + Assert.assertEquals(spec, metadata.spec()); + + Schema updatedSchema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "z", Types.StringType.get()), + Types.NestedField.required(3, "y", Types.LongType.get()) + ); + PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0) + .bucket("z", 8) + .identity("x") + .build(); + TableMetadata updated = metadata.buildReplacement( + updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of()); + PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1) + .add(1, 1000, "x", "identity") + .add(2, 1001, "y", "void") + .add(3, 1002, "z_bucket", "bucket[8]") + .build(); + Assert.assertEquals( + "Should reassign the partition field IDs and reuse any existing IDs for equivalent fields", + expected, updated.spec()); + } + + @Test + public void testBuildReplacementForV2Table() { + Schema schema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0) + .identity("x") + .identity("y") + .build(); + String location = "file://tmp/db/table"; + TableMetadata metadata = TableMetadata.newTableMetadata( + schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 2); + Assert.assertEquals(spec, metadata.spec()); + + Schema updatedSchema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "z", Types.StringType.get()) + ); + PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0) + .bucket("z", 8) + .identity("x") + .build(); + TableMetadata updated = metadata.buildReplacement( + updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of()); + PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1) + .add(3, 1002, "z_bucket", "bucket[8]") + .add(1, 1000, "x", "identity") + .build(); + Assert.assertEquals( + "Should reassign the partition field IDs and reuse any existing IDs for equivalent fields", + expected, updated.spec()); + } + @Test public void testSortOrder() { Schema schema = new Schema( diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java index 08fa022a0ab4..bebf5b5b32c1 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java @@ -118,7 +118,13 @@ public void testReplaceTxnBuilder() throws Exception { table = catalog.loadTable(tableIdent); Assert.assertNull(table.currentSnapshot()); - Assert.assertTrue(table.spec().isUnpartitioned()); + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("data", "data_bucket") + .withSpecId(1) + .build(); + Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); + Assert.assertEquals("value1", table.properties().get("key1")); Assert.assertEquals("value2", table.properties().get("key2")); } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java index 5292e3146ec7..7a71b0f134ab 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java @@ -195,7 +195,13 @@ public void testReplaceTxnBuilder() { table = catalog.loadTable(tableIdent); Assert.assertNull(table.currentSnapshot()); - Assert.assertTrue(table.spec().isUnpartitioned()); + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("data", "data_bucket") + .withSpecId(1) + .build(); + Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); + Assert.assertEquals("value1", table.properties().get("key1")); Assert.assertEquals("value2", table.properties().get("key2")); } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java index 93ba58d6e470..f291baf51f6e 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java @@ -155,7 +155,12 @@ public void testReplaceTableTxn() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size()); + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("id", "id") + .withSpecId(1) + .build(); + Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); } @Test @@ -233,7 +238,12 @@ public void testCreateOrReplaceTableTxnTableExists() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size()); + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("id", "id") + .withSpecId(1) + .build(); + Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); } @Test diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index efd523c96aa5..3755e5f39017 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -176,7 +176,13 @@ public void testReplaceTxnBuilder() throws Exception { table = catalog.loadTable(tableIdent); Assert.assertEquals(newLocation, table.location()); Assert.assertNull(table.currentSnapshot()); - Assert.assertTrue(table.spec().isUnpartitioned()); + PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema()) + .alwaysNull("data", "data_bucket") + .withSpecId(1) + .build(); + Assert.assertEquals("Table should have a spec with one void field", + v1Expected, table.spec()); + Assert.assertEquals("value1", table.properties().get("key1")); Assert.assertEquals("value2", table.properties().get("key2")); } finally {