diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 3d0ad686294b..8f90b5691a1a 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -180,6 +180,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } + class RemoveSchemas implements MetadataUpdate { + private final Set schemaIds; + + public RemoveSchemas(Set schemaIds) { + this.schemaIds = schemaIds; + } + + public Set schemaIds() { + return schemaIds; + } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeSchemas(schemaIds); + } + } + class AddSortOrder implements MetadataUpdate { private final UnboundSortOrder sortOrder; diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 9159da0b647f..19c48de958bb 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -60,6 +60,7 @@ private MetadataUpdateParser() {} static final String SET_PARTITION_STATISTICS = "set-partition-statistics"; static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; + static final String REMOVE_SCHEMAS = "remove-schemas"; static final String ENABLE_ROW_LINEAGE = "enable-row-lineage"; // AssignUUID @@ -131,6 +132,9 @@ private MetadataUpdateParser() {} // RemovePartitionSpecs private static final String SPEC_IDS = "spec-ids"; + // RemoveSchemas + private static final String SCHEMA_IDS = "schema-ids"; + private static final Map, String> ACTIONS = ImmutableMap., String>builder() .put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID) @@ -155,6 +159,7 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION) .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) + .put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS) .put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE) .buildOrThrow(); @@ -251,6 +256,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator case REMOVE_PARTITION_SPECS: writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator); break; + case REMOVE_SCHEMAS: + writeRemoveSchemas((MetadataUpdate.RemoveSchemas) metadataUpdate, generator); + break; case ENABLE_ROW_LINEAGE: break; default: @@ -326,6 +334,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readCurrentViewVersionId(jsonNode); case REMOVE_PARTITION_SPECS: return readRemovePartitionSpecs(jsonNode); + case REMOVE_SCHEMAS: + return readRemoveSchemas(jsonNode); case ENABLE_ROW_LINEAGE: return new MetadataUpdate.EnableRowLineage(); default: @@ -468,6 +478,11 @@ private static void writeRemovePartitionSpecs( JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen); } + private static void writeRemoveSchemas( + MetadataUpdate.RemoveSchemas metadataUpdate, JsonGenerator gen) throws IOException { + JsonUtil.writeIntegerArray(SCHEMA_IDS, metadataUpdate.schemaIds(), gen); + } + private static MetadataUpdate readAssignUUID(JsonNode node) { String uuid = JsonUtil.getString(UUID, node); return new MetadataUpdate.AssignUUID(uuid); @@ -614,4 +629,8 @@ private static MetadataUpdate readCurrentViewVersionId(JsonNode node) { private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) { return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node)); } + + private static MetadataUpdate readRemoveSchemas(JsonNode node) { + return new MetadataUpdate.RemoveSchemas(JsonUtil.getIntegerSet(SCHEMA_IDS, node)); + } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 0cc89433413d..9418b0f00765 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -218,16 +218,21 @@ private TableMetadata internalApply() { updatedMetaBuilder.removeSnapshots(idsToRemove); if (cleanExpiredMetadata) { - // TODO: Support cleaning expired schema as well. Set reachableSpecs = Sets.newConcurrentHashSet(); reachableSpecs.add(base.defaultSpecId()); + Set reachableSchemas = Sets.newConcurrentHashSet(); + reachableSchemas.add(base.currentSchemaId()); + Tasks.foreach(idsToRetain) .executeWith(planExecutorService) .run( - snapshot -> - base.snapshot(snapshot).allManifests(ops.io()).stream() - .map(ManifestFile::partitionSpecId) - .forEach(reachableSpecs::add)); + snapshotId -> { + Snapshot snapshot = base.snapshot(snapshotId); + snapshot.allManifests(ops.io()).stream() + .map(ManifestFile::partitionSpecId) + .forEach(reachableSpecs::add); + reachableSchemas.add(snapshot.schemaId()); + }); Set specsToRemove = base.specs().stream() @@ -235,6 +240,13 @@ private TableMetadata internalApply() { .filter(specId -> !reachableSpecs.contains(specId)) .collect(Collectors.toSet()); updatedMetaBuilder.removeSpecs(specsToRemove); + + Set schemasToRemove = + base.schemas().stream() + .map(Schema::schemaId) + .filter(schemaId -> !reachableSchemas.contains(schemaId)) + .collect(Collectors.toSet()); + updatedMetaBuilder.removeSchemas(schemasToRemove); } return updatedMetaBuilder.build(); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 251f6777255a..c4a328dd7999 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -913,7 +913,7 @@ public static class Builder { private long lastSequenceNumber; private int lastColumnId; private int currentSchemaId; - private final List schemas; + private List schemas; private int defaultSpecId; private List specs; private int lastAssignedPartitionId; @@ -1168,6 +1168,23 @@ Builder removeSpecs(Iterable specIds) { .filter(s -> !specIdsToRemove.contains(s.specId())) .collect(Collectors.toList()); changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove)); + + return this; + } + + Builder removeSchemas(Iterable schemaIds) { + Set schemaIdsToRemove = Sets.newHashSet(schemaIds); + Preconditions.checkArgument( + !schemaIdsToRemove.contains(currentSchemaId), "Cannot remove the current schema"); + + if (!schemaIdsToRemove.isEmpty()) { + this.schemas = + schemas.stream() + .filter(s -> !schemaIdsToRemove.contains(s.schemaId())) + .collect(Collectors.toList()); + changes.add(new MetadataUpdate.RemoveSchemas(schemaIdsToRemove)); + } + return this; } diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java index 95369d51934d..fdf33c894992 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java +++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java @@ -107,6 +107,8 @@ private Builder update(MetadataUpdate update) { update((MetadataUpdate.SetDefaultSortOrder) update); } else if (update instanceof MetadataUpdate.RemovePartitionSpecs) { update((MetadataUpdate.RemovePartitionSpecs) update); + } else if (update instanceof MetadataUpdate.RemoveSchemas) { + update((MetadataUpdate.RemoveSchemas) update); } return this; @@ -136,13 +138,7 @@ private void update(MetadataUpdate.AddSchema unused) { } private void update(MetadataUpdate.SetCurrentSchema unused) { - if (!setSchemaId) { - if (base != null && !isReplace) { - // require that the current schema has not changed - require(new UpdateRequirement.AssertCurrentSchemaID(base.currentSchemaId())); - } - this.setSchemaId = true; - } + requireCurrentSchemaNotChanged(); } private void update(MetadataUpdate.AddPartitionSpec unused) { @@ -156,13 +152,7 @@ private void update(MetadataUpdate.AddPartitionSpec unused) { } private void update(MetadataUpdate.SetDefaultPartitionSpec unused) { - if (!setSpecId) { - if (base != null && !isReplace) { - // require that the default spec has not changed - require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId())); - } - this.setSpecId = true; - } + requireDefaultPartitionSpecNotChanged(); } private void update(MetadataUpdate.SetDefaultSortOrder unused) { @@ -176,15 +166,38 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) { } private void update(MetadataUpdate.RemovePartitionSpecs unused) { - // require that the default partition spec has not changed + requireDefaultPartitionSpecNotChanged(); + + // require that no branches have changed, so that old specs won't be written. + requireNoBranchesChanged(); + } + + private void update(MetadataUpdate.RemoveSchemas unused) { + requireCurrentSchemaNotChanged(); + + // require that no branches have changed, so that old schemas won't be written. + requireNoBranchesChanged(); + } + + private void requireDefaultPartitionSpecNotChanged() { if (!setSpecId) { if (base != null && !isReplace) { require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId())); } this.setSpecId = true; } + } - // require that no branches have changed, so that old specs won't be written. + private void requireCurrentSchemaNotChanged() { + if (!setSchemaId) { + if (base != null && !isReplace) { + require(new UpdateRequirement.AssertCurrentSchemaID(base.currentSchemaId())); + } + this.setSchemaId = true; + } + } + + private void requireNoBranchesChanged() { if (base != null && !isReplace) { base.refs() .forEach( diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 741184d612f1..4d66163d6c79 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -930,6 +930,17 @@ public void testRemovePartitionSpec() { .isEqualTo(json); } + @Test + public void testRemoveSchemas() { + String action = MetadataUpdateParser.REMOVE_SCHEMAS; + String json = "{\"action\":\"remove-schemas\",\"schema-ids\":[1,2,3]}"; + MetadataUpdate expected = new MetadataUpdate.RemoveSchemas(ImmutableSet.of(1, 2, 3)); + assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); + assertThat(MetadataUpdateParser.toJson(expected)) + .as("Remove schemas should convert to the correct JSON value") + .isEqualTo(json); + } + @Test public void testEnableRowLineage() { String action = MetadataUpdateParser.ENABLE_ROW_LINEAGE; @@ -1050,6 +1061,11 @@ public void assertEquals( (MetadataUpdate.RemovePartitionSpecs) expectedUpdate, (MetadataUpdate.RemovePartitionSpecs) actualUpdate); break; + case MetadataUpdateParser.REMOVE_SCHEMAS: + assertEqualsRemoveSchemas( + (MetadataUpdate.RemoveSchemas) expectedUpdate, + (MetadataUpdate.RemoveSchemas) actualUpdate); + break; case MetadataUpdateParser.ENABLE_ROW_LINEAGE: assertThat(actualUpdate).isInstanceOf(MetadataUpdate.EnableRowLineage.class); break; @@ -1279,6 +1295,11 @@ private static void assertEqualsRemovePartitionSpecs( assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds()); } + private static void assertEqualsRemoveSchemas( + MetadataUpdate.RemoveSchemas expected, MetadataUpdate.RemoveSchemas actual) { + assertThat(actual.schemaIds()).containsExactlyInAnyOrderElementsOf(expected.schemaIds()); + } + private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId) throws IOException { File manifestList = File.createTempFile("manifests", null, temp.toFile()); diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index 0fe76a4e518d..0a1c9cf21570 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -21,6 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import java.io.File; import java.io.IOException; @@ -48,8 +50,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; @ExtendWith(ParameterizedTestExtension.class) public class TestRemoveSnapshots extends TestBase { @@ -1705,6 +1709,77 @@ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException { .containsExactly(dataBucketSpec.specId()); } + @TestTemplate + public void testRemoveSchemas() { + table.newAppend().appendFile(FILE_A).commit(); + + Set expectedDeletedFiles = Sets.newHashSet(); + expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation()); + + table.updateSchema().addColumn("extra_col1", Types.StringType.get()).commit(); + + table.newAppend().appendFile(FILE_B).commit(); + expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation()); + + table.updateSchema().addColumn("extra_col2", Types.LongType.get()).deleteColumn("id").commit(); + + table.newAppend().appendFile(FILE_A2).commit(); + + assertThat(table.schemas()).hasSize(3); + + Set deletedFiles = Sets.newHashSet(); + // Expire all snapshots and schemas except the current ones. + removeSnapshots(table) + .expireOlderThan(System.currentTimeMillis()) + .cleanExpiredMetadata(true) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletedFiles); + assertThat(table.schemas().values()).containsExactly(table.schema()); + } + + @TestTemplate + public void testNoSchemasToRemove() { + String tableName = "test_no_schemas_to_remove"; + TestTables.TestTableOperations ops = + Mockito.spy(new TestTables.TestTableOperations(tableName, tableDir)); + TestTables.TestTable table = + TestTables.create( + tableDir, + tableName, + SCHEMA, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + formatVersion, + ops); + + table.newAppend().appendFile(FILE_A).commit(); + + Set expectedDeletedFiles = Sets.newHashSet(); + expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation()); + + table.newAppend().appendFile(FILE_B).commit(); + + Set deletedFiles = Sets.newHashSet(); + // Expire all snapshots except the current one. No unused schemas to be removed. + removeSnapshots(table) + .expireOlderThan(System.currentTimeMillis()) + .cleanExpiredMetadata(true) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletedFiles); + assertThat(table.schemas().values()).containsExactly(table.schema()); + Mockito.verify(ops, Mockito.never()) + .commit( + any(), + argThat( + meta -> + meta.changes().stream() + .anyMatch(u -> u instanceof MetadataUpdate.RemoveSchemas))); + } + private Set manifestPaths(Snapshot snapshot, FileIO io) { return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet()); } diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 1cd60fbbd177..07b4b0591646 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -1858,4 +1858,20 @@ public void onlyMetadataLocationIsUpdatedWithoutTimestampAndMetadataLogEntry() { assertThat(updatedMetadata.metadataFileLocation()).isEqualTo("updated-metadata-location"); assertThat(updatedMetadata.previousFiles()).isEmpty(); } + + @Test + public void testMetadataWithRemoveSchemas() { + TableMetadata meta = + TableMetadata.buildFrom( + TableMetadata.newTableMetadata( + TestBase.SCHEMA, PartitionSpec.unpartitioned(), null, ImmutableMap.of())) + .removeSchemas(Sets.newHashSet()) + .build(); + + assertThat(meta.changes()).noneMatch(u -> u instanceof MetadataUpdate.RemoveSchemas); + + meta = TableMetadata.buildFrom(meta).removeSchemas(Sets.newHashSet(1, 2)).build(); + + assertThat(meta.changes()).anyMatch(u -> u instanceof MetadataUpdate.RemoveSchemas); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index cf1480747c45..4c45b8f701cc 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -59,7 +59,22 @@ public static TestTable create( PartitionSpec spec, SortOrder sortOrder, int formatVersion) { - return createTable(temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, null); + TestTableOperations ops = new TestTableOperations(name, temp); + + return createTable( + temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, null, ops); + } + + public static TestTable create( + File temp, + String name, + Schema schema, + PartitionSpec spec, + SortOrder sortOrder, + int formatVersion, + TestTableOperations ops) { + return createTable( + temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, null, ops); } public static TestTable create( @@ -70,8 +85,10 @@ public static TestTable create( SortOrder sortOrder, int formatVersion, MetricsReporter reporter) { + TestTableOperations ops = new TestTableOperations(name, temp); + return createTable( - temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, reporter); + temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, reporter, ops); } public static TestTable create( @@ -81,8 +98,10 @@ public static TestTable create( PartitionSpec spec, int formatVersion, Map properties) { + TestTableOperations ops = new TestTableOperations(name, temp); + return createTable( - temp, name, schema, spec, formatVersion, properties, SortOrder.unsorted(), null); + temp, name, schema, spec, formatVersion, properties, SortOrder.unsorted(), null, ops); } private static TestTable createTable( @@ -93,8 +112,8 @@ private static TestTable createTable( int formatVersion, Map properties, SortOrder sortOrder, - MetricsReporter reporter) { - TestTableOperations ops = new TestTableOperations(name, temp); + MetricsReporter reporter, + TestTableOperations ops) { if (ops.current() != null) { throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java index 3f1329d4f040..25075fc5251d 100644 --- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java +++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java @@ -301,11 +301,7 @@ public void setCurrentSchema() { assertTableUUID(requirements); - assertThat(requirements) - .element(1) - .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertCurrentSchemaID.class)) - .extracting(UpdateRequirement.AssertCurrentSchemaID::schemaId) - .isEqualTo(schemaId); + assertCurrentSchemaId(requirements, 1, schemaId); } @Test @@ -399,11 +395,7 @@ public void setDefaultPartitionSpec() { assertTableUUID(requirements); - assertThat(requirements) - .element(1) - .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) - .extracting(UpdateRequirement.AssertDefaultSpecID::specId) - .isEqualTo(specId); + assertDefaultSpecId(requirements, 1, specId); } @Test @@ -442,11 +434,7 @@ public void removePartitionSpec() { assertTableUUID(requirements); - assertThat(requirements) - .element(1) - .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) - .extracting(UpdateRequirement.AssertDefaultSpecID::specId) - .isEqualTo(defaultSpecId); + assertDefaultSpecId(requirements, 1, defaultSpecId); } @Test @@ -455,12 +443,7 @@ public void testRemovePartitionSpecsWithBranch() { long snapshotId = 42L; when(metadata.defaultSpecId()).thenReturn(defaultSpecId); - String branch = "branch"; - SnapshotRef snapshotRef = mock(SnapshotRef.class); - when(snapshotRef.snapshotId()).thenReturn(snapshotId); - when(snapshotRef.isBranch()).thenReturn(true); - when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef)); - when(metadata.ref(branch)).thenReturn(snapshotRef); + mockBranch("branch", snapshotId); List requirements = UpdateRequirements.forUpdateTable( @@ -477,21 +460,13 @@ public void testRemovePartitionSpecsWithBranch() { assertTableUUID(requirements); - assertThat(requirements) - .element(1) - .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) - .extracting(UpdateRequirement.AssertDefaultSpecID::specId) - .isEqualTo(defaultSpecId); + assertDefaultSpecId(requirements, 1, defaultSpecId); - assertThat(requirements) - .element(2) - .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertRefSnapshotID.class)) - .extracting(UpdateRequirement.AssertRefSnapshotID::snapshotId) - .isEqualTo(snapshotId); + assertRefSnapshotId(requirements, 2, snapshotId); } @Test - public void testRemovePartitionSpecsFailure() { + public void testRemovePartitionSpecsWithSpecChangedFailure() { int defaultSpecId = 3; when(metadata.defaultSpecId()).thenReturn(defaultSpecId); when(updated.defaultSpecId()).thenReturn(defaultSpecId + 1); @@ -509,28 +484,121 @@ public void testRemovePartitionSpecsFailure() { } @Test - public void testRemovePartitionSpecsWithBranchFailure() { + public void testRemovePartitionSpecsWithBranchChangedFailure() { int defaultSpecId = 3; - long snapshotId = 42L; when(metadata.defaultSpecId()).thenReturn(defaultSpecId); when(updated.defaultSpecId()).thenReturn(defaultSpecId); + long snapshotId = 42L; String branch = "test"; + mockBranchChanged(branch, snapshotId); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, + ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + + assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated))) + .isInstanceOf(CommitFailedException.class) + .hasMessage( + "Requirement failed: branch %s has changed: expected id %s != %s", + branch, snapshotId, snapshotId + 1); + } + + private void mockBranchChanged(String branch, long snapshotId) { + mockBranch(branch, snapshotId); + + SnapshotRef updatedRef = mock(SnapshotRef.class); + when(updatedRef.snapshotId()).thenReturn(snapshotId + 1); + when(updatedRef.isBranch()).thenReturn(true); + when(updated.ref(branch)).thenReturn(updatedRef); + } + + private void mockBranch(String branch, long snapshotId) { SnapshotRef snapshotRef = mock(SnapshotRef.class); when(snapshotRef.snapshotId()).thenReturn(snapshotId); when(snapshotRef.isBranch()).thenReturn(true); when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef)); when(metadata.ref(branch)).thenReturn(snapshotRef); + } - SnapshotRef updatedRef = mock(SnapshotRef.class); - when(updatedRef.snapshotId()).thenReturn(snapshotId + 1); - when(updatedRef.isBranch()).thenReturn(true); - when(updated.ref(branch)).thenReturn(updatedRef); + @Test + public void removeSchemas() { + int currentSchemaId = 3; + when(metadata.currentSchemaId()).thenReturn(currentSchemaId); List requirements = UpdateRequirements.forUpdateTable( - metadata, - ImmutableList.of(new MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2)))); + metadata, ImmutableList.of(new MetadataUpdate.RemoveSchemas(Sets.newHashSet(1, 2)))); + requirements.forEach(req -> req.validate(metadata)); + + assertThat(requirements) + .hasSize(2) + .hasOnlyElementsOfTypes( + UpdateRequirement.AssertTableUUID.class, UpdateRequirement.AssertCurrentSchemaID.class); + + assertTableUUID(requirements); + + assertCurrentSchemaId(requirements, 1, currentSchemaId); + } + + @Test + public void testRemoveSchemasWithBranch() { + int currentSchemaId = 3; + long snapshotId = 42L; + when(metadata.currentSchemaId()).thenReturn(currentSchemaId); + + mockBranch("branch", snapshotId); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, ImmutableList.of(new MetadataUpdate.RemoveSchemas(Sets.newHashSet(1, 2)))); + requirements.forEach(req -> req.validate(metadata)); + + assertThat(requirements) + .hasSize(3) + .hasOnlyElementsOfTypes( + UpdateRequirement.AssertTableUUID.class, + UpdateRequirement.AssertCurrentSchemaID.class, + UpdateRequirement.AssertRefSnapshotID.class); + + assertTableUUID(requirements); + + assertCurrentSchemaId(requirements, 1, currentSchemaId); + + assertRefSnapshotId(requirements, 2, snapshotId); + } + + @Test + public void testRemoveSchemasWithSchemaChangedFailure() { + int currentSchemaId = 3; + when(metadata.currentSchemaId()).thenReturn(currentSchemaId); + when(updated.currentSchemaId()).thenReturn(currentSchemaId + 1); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, ImmutableList.of(new MetadataUpdate.RemoveSchemas(Sets.newHashSet(1, 2)))); + + assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated))) + .isInstanceOf(CommitFailedException.class) + .hasMessage( + "Requirement failed: current schema changed: expected id %s != %s", + currentSchemaId, currentSchemaId + 1); + } + + @Test + public void testRemoveSchemasWithBranchChangedFailure() { + int currentSchemaId = 3; + when(metadata.currentSchemaId()).thenReturn(currentSchemaId); + when(updated.currentSchemaId()).thenReturn(currentSchemaId); + + long snapshotId = 42L; + String branch = "test"; + mockBranchChanged(branch, snapshotId); + + List requirements = + UpdateRequirements.forUpdateTable( + metadata, ImmutableList.of(new MetadataUpdate.RemoveSchemas(Sets.newHashSet(1, 2)))); assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated))) .isInstanceOf(CommitFailedException.class) @@ -902,4 +970,30 @@ private void assertViewUUID(List requirements) { .extracting(UpdateRequirement.AssertViewUUID::uuid) .isEqualTo(viewMetadata.uuid()); } + + private void assertDefaultSpecId( + List requirements, int idx, int defaultSpecId) { + assertThat(requirements) + .element(idx) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class)) + .extracting(UpdateRequirement.AssertDefaultSpecID::specId) + .isEqualTo(defaultSpecId); + } + + private void assertCurrentSchemaId( + List requirements, int idx, int currentSchemaId) { + assertThat(requirements) + .element(idx) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertCurrentSchemaID.class)) + .extracting(UpdateRequirement.AssertCurrentSchemaID::schemaId) + .isEqualTo(currentSchemaId); + } + + private void assertRefSnapshotId(List requirements, int idx, long snapshotId) { + assertThat(requirements) + .element(idx) + .asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertRefSnapshotID.class)) + .extracting(UpdateRequirement.AssertRefSnapshotID::snapshotId) + .isEqualTo(snapshotId); + } } diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 58b67e953f3e..b7cfd9b16be4 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -47,6 +47,7 @@ import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.ReplaceSortOrder; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; @@ -1426,6 +1427,66 @@ public void testRemoveUnusedSpec(boolean withBranch) { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRemoveUnusedSchemas(boolean withBranch) { + String branch = "test"; + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = + catalog + .buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .withProperty(TableProperties.GC_ENABLED, "true") + .create(); + + table.newFastAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + if (withBranch) { + table.manageSnapshots().createBranch(branch).commit(); + } + + table.updateSchema().addColumn("col_to_delete", Types.IntegerType.get()).commit(); + table.updateSchema().deleteColumn("col_to_delete").commit(); + table.updateSchema().addColumn("extra_col", Types.StringType.get()).commit(); + + assertThat(table.schemas().values()).as("Should have 3 total schemas").hasSize(3); + + // Keeps the schema used by the single snapshot and the current schema. + // Doesn't remove snapshots. + table.expireSnapshots().cleanExpiredMetadata(true).commit(); + + Table loaded = catalog.loadTable(TABLE); + assertThat(loaded.snapshot(firstSnapshot.snapshotId())).isNotNull(); + assertThat(loaded.schemas().keySet()) + .containsExactlyInAnyOrder(firstSnapshot.schemaId(), loaded.schema().schemaId()); + + table.updateSchema().addColumn("extra_col2", Types.LongType.get()).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + + table + .expireSnapshots() + .expireOlderThan(table.currentSnapshot().timestampMillis()) + .cleanExpiredMetadata(true) + .commit(); + + loaded = catalog.loadTable(TABLE); + if (withBranch) { + assertThat(loaded.snapshots()) + .containsExactlyInAnyOrder(firstSnapshot, loaded.currentSnapshot()); + assertThat(loaded.schemas().keySet()) + .containsExactlyInAnyOrder(firstSnapshot.schemaId(), loaded.currentSnapshot().schemaId()); + } else { + assertThat(loaded.snapshot(firstSnapshot.snapshotId())).isNull(); + assertThat(loaded.schemas().keySet()) + .containsExactlyInAnyOrder(loaded.currentSnapshot().schemaId()); + } + } + @Test public void testUpdateTableSortOrder() { C catalog = catalog();