diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index ced373ef85aa..e843c4d28f2b 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -181,6 +181,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } + class RemoveSortOrders implements MetadataUpdate { + private final Set sortOrderIds; + + public RemoveSortOrders(Set sortOrderIds) { + this.sortOrderIds = sortOrderIds; + } + + public Set sortOrderIds() { + return sortOrderIds; + } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeSortOrders(sortOrderIds); + } + } + class RemoveSchemas implements MetadataUpdate { private final Set schemaIds; diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 280fe7565d75..c0f81c514e9d 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -59,6 +59,7 @@ private MetadataUpdateParser() {} static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; static final String REMOVE_SCHEMAS = "remove-schemas"; + static final String REMOVE_SORT_ORDERS = "remove-sort-orders"; static final String ADD_ENCRYPTION_KEY = "add-encryption-key"; static final String REMOVE_ENCRYPTION_KEY = "remove-encryption-key"; @@ -134,6 +135,9 @@ private MetadataUpdateParser() {} // RemoveSchemas private static final String SCHEMA_IDS = "schema-ids"; + // RemoveSortOrders + private static final String SORT_ORDER_IDS = "sort-order-ids"; + // AddEncryptionKey private static final String ENCRYPTION_KEY = "encryption-key"; @@ -165,6 +169,7 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) .put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS) + .put(MetadataUpdate.RemoveSortOrders.class, REMOVE_SORT_ORDERS) .put(MetadataUpdate.AddEncryptionKey.class, ADD_ENCRYPTION_KEY) .put(MetadataUpdate.RemoveEncryptionKey.class, REMOVE_ENCRYPTION_KEY) .buildOrThrow(); @@ -271,6 +276,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator case REMOVE_ENCRYPTION_KEY: writeRemoveEncryptionKey((MetadataUpdate.RemoveEncryptionKey) metadataUpdate, generator); break; + case REMOVE_SORT_ORDERS: + writeRemoveSortOrders((MetadataUpdate.RemoveSortOrders) metadataUpdate, generator); + break; default: throw new IllegalArgumentException( String.format( @@ -350,6 +358,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readAddEncryptionKey(jsonNode); case REMOVE_ENCRYPTION_KEY: return readRemoveEncryptionKey(jsonNode); + case REMOVE_SORT_ORDERS: + return readSortOrders(jsonNode); default: throw new UnsupportedOperationException( String.format("Cannot convert metadata update action to json: %s", action)); @@ -493,6 +503,11 @@ private static void writeRemoveSchemas( JsonUtil.writeIntegerArray(SCHEMA_IDS, metadataUpdate.schemaIds(), gen); } + private static void writeRemoveSortOrders( + MetadataUpdate.RemoveSortOrders metadataUpdate, JsonGenerator gen) throws IOException { + JsonUtil.writeIntegerArray(SORT_ORDER_IDS, metadataUpdate.sortOrderIds(), gen); + } + private static void writeAddEncryptionKey( MetadataUpdate.AddEncryptionKey update, JsonGenerator gen) throws IOException { gen.writeFieldName(ENCRYPTION_KEY); @@ -654,6 +669,10 @@ private static MetadataUpdate readRemoveSchemas(JsonNode node) { return new MetadataUpdate.RemoveSchemas(JsonUtil.getIntegerSet(SCHEMA_IDS, node)); } + private static MetadataUpdate readSortOrders(JsonNode node) { + return new MetadataUpdate.RemoveSortOrders(JsonUtil.getIntegerSet(SORT_ORDER_IDS, node)); + } + private static MetadataUpdate readAddEncryptionKey(JsonNode node) { JsonNode keyNode = node.get(ENCRYPTION_KEY); Preconditions.checkArgument( diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 1c94b988b4ca..fc8adb206456 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -38,10 +38,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -224,16 +226,31 @@ private TableMetadata internalApply() { reachableSpecs.add(base.defaultSpecId()); Set reachableSchemas = Sets.newConcurrentHashSet(); reachableSchemas.add(base.currentSchemaId()); + Set reachableSortOrders = Sets.newConcurrentHashSet(); + reachableSortOrders.add(base.defaultSortOrderId()); Tasks.foreach(idsToRetain) .executeWith(planExecutorService()) .run( snapshotId -> { Snapshot snapshot = base.snapshot(snapshotId); - snapshot.allManifests(ops.io()).stream() - .map(ManifestFile::partitionSpecId) - .forEach(reachableSpecs::add); + List manifests = snapshot.allManifests(ops.io()); + + // Collect partition spec IDs + manifests.stream().map(ManifestFile::partitionSpecId).forEach(reachableSpecs::add); + reachableSchemas.add(snapshot.schemaId()); + + // Collect sort order IDs from live entries + manifests.stream() + .map(manifestFile -> ManifestFiles.open(manifestFile, ops.io())) + .map(ManifestReader::entries) + .flatMap(entries -> StreamSupport.stream(entries.spliterator(), false)) + .filter(ManifestEntry::isLive) + .map(ManifestEntry::file) + .map(ContentFile::sortOrderId) + .filter(Objects::nonNull) + .forEach(reachableSortOrders::add); }); Set specsToRemove = @@ -249,6 +266,13 @@ private TableMetadata internalApply() { .filter(schemaId -> !reachableSchemas.contains(schemaId)) .collect(Collectors.toSet()); updatedMetaBuilder.removeSchemas(schemasToRemove); + + Set sortOrdersToRemove = + base.sortOrders().stream() + .map(SortOrder::orderId) + .filter(sortId -> !reachableSortOrders.contains(sortId)) + .collect(Collectors.toSet()); + updatedMetaBuilder.removeSortOrders(sortOrdersToRemove); } return updatedMetaBuilder.build(); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 3c2a3eb9b7a7..d32a62e0c93d 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1229,6 +1229,21 @@ public Builder addSortOrder(SortOrder order) { return this; } + Builder removeSortOrders(Iterable sortIds) { + Set sortIdsToRemove = Sets.newHashSet(sortIds); + Preconditions.checkArgument( + !sortIdsToRemove.contains(defaultSortOrderId), "Cannot remove the default sort-id"); + + if (!sortIdsToRemove.isEmpty()) { + this.sortOrders = + sortOrders.stream() + .filter(s -> !sortIdsToRemove.contains(s.orderId())) + .collect(Collectors.toList()); + changes.add(new MetadataUpdate.RemoveSortOrders(sortIdsToRemove)); + } + return this; + } + public Builder addSnapshot(Snapshot snapshot) { if (snapshot == null) { // change is a noop diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java index fdf33c894992..a5026f1099f9 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java +++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java @@ -109,6 +109,8 @@ private Builder update(MetadataUpdate update) { update((MetadataUpdate.RemovePartitionSpecs) update); } else if (update instanceof MetadataUpdate.RemoveSchemas) { update((MetadataUpdate.RemoveSchemas) update); + } else if (update instanceof MetadataUpdate.RemoveSortOrders) { + update((MetadataUpdate.RemoveSortOrders) update); } return this; @@ -156,13 +158,7 @@ private void update(MetadataUpdate.SetDefaultPartitionSpec unused) { } private void update(MetadataUpdate.SetDefaultSortOrder unused) { - if (!setOrderId) { - if (base != null && !isReplace) { - // require that the default write order has not changed - require(new UpdateRequirement.AssertDefaultSortOrderID(base.defaultSortOrderId())); - } - this.setOrderId = true; - } + requireDefaultSortOrderNotChanged(); } private void update(MetadataUpdate.RemovePartitionSpecs unused) { @@ -179,6 +175,12 @@ private void update(MetadataUpdate.RemoveSchemas unused) { requireNoBranchesChanged(); } + private void update(MetadataUpdate.RemoveSortOrders unused) { + requireDefaultSortOrderNotChanged(); + + requireNoBranchesChanged(); + } + private void requireDefaultPartitionSpecNotChanged() { if (!setSpecId) { if (base != null && !isReplace) { @@ -197,6 +199,16 @@ private void requireCurrentSchemaNotChanged() { } } + private void requireDefaultSortOrderNotChanged() { + if (!setOrderId) { + if (base != null && !isReplace) { + // require that the default write order has not changed + require(new UpdateRequirement.AssertDefaultSortOrderID(base.defaultSortOrderId())); + } + this.setOrderId = true; + } + } + private void requireNoBranchesChanged() { if (base != null && !isReplace) { base.refs() diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index ce80377c90a5..7559555c26a8 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -970,6 +970,17 @@ public void testRemoveSchemas() { .isEqualTo(json); } + @Test + public void testRemoveSortOrders() { + String action = MetadataUpdateParser.REMOVE_SORT_ORDERS; + String json = "{\"action\":\"remove-sort-orders\",\"sort-order-ids\":[1,2,3]}"; + MetadataUpdate expected = new MetadataUpdate.RemoveSortOrders(ImmutableSet.of(1, 2, 3)); + assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); + assertThat(MetadataUpdateParser.toJson(expected)) + .as("Remove Sort Orders should convert to the correct JSON value") + .isEqualTo(json); + } + @Test public void testAddEncryptionKey() { byte[] keyBytes = "key".getBytes(StandardCharsets.UTF_8); @@ -1113,6 +1124,11 @@ public void assertEquals( (MetadataUpdate.RemoveSchemas) expectedUpdate, (MetadataUpdate.RemoveSchemas) actualUpdate); break; + case MetadataUpdateParser.REMOVE_SORT_ORDERS: + assertEqualsRemoveSortOrders( + (MetadataUpdate.RemoveSortOrders) expectedUpdate, + (MetadataUpdate.RemoveSortOrders) actualUpdate); + break; case MetadataUpdateParser.ADD_ENCRYPTION_KEY: assertEqualsAddEncryptionKey( (MetadataUpdate.AddEncryptionKey) expectedUpdate, @@ -1354,6 +1370,11 @@ private static void assertEqualsRemoveSchemas( assertThat(actual.schemaIds()).containsExactlyInAnyOrderElementsOf(expected.schemaIds()); } + private static void assertEqualsRemoveSortOrders( + MetadataUpdate.RemoveSortOrders expected, MetadataUpdate.RemoveSortOrders actual) { + assertThat(actual.sortOrderIds()).containsExactlyInAnyOrderElementsOf(expected.sortOrderIds()); + } + private static void assertEqualsAddEncryptionKey( MetadataUpdate.AddEncryptionKey expected, MetadataUpdate.AddEncryptionKey actual) { assertThat(actual.key().keyId()).isEqualTo(expected.key().keyId()); diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index a473e605e2f6..659265120be6 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -1823,6 +1823,59 @@ public void testNoSchemasOrSpecsToRemove() { meta -> meta.changes().stream() .anyMatch(u -> u instanceof MetadataUpdate.RemoveSchemas))); + Mockito.verify(ops, Mockito.never()) + .commit( + any(), + argThat( + meta -> + meta.changes().stream() + .anyMatch(u -> u instanceof MetadataUpdate.RemoveSortOrders))); + } + + @TestTemplate + public void testRemoveSortOrders() { + SortOrder idSortOrder = SortOrder.builderFor(table.schema()).asc("id").build(); + DataFile initialFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .withSortOrder(idSortOrder) + .build(); + table.newAppend().appendFile(initialFile).commit(); + + Set expectedDeletedFiles = Sets.newHashSet(); + expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation()); + + table.replaceSortOrder().asc("data").commit(); + + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .withSortOrder(table.sortOrder()) + .build(); + table.newAppend().appendFile(dataFile).commit(); + expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation()); + + table.newDelete().deleteFile(initialFile).commit(); + + assertThat(table.sortOrders()).hasSize(2); + + Set deletedFiles = Sets.newHashSet(); + // Expire all snapshots and sort-orders except the current ones. + removeSnapshots(table) + .expireOlderThan(System.currentTimeMillis()) + .cleanExpiredMetadata(true) + .deleteWith(deletedFiles::add) + .commit(); + + // other manifest files can be present, as there is delete file + assertThat(deletedFiles).containsAll(expectedDeletedFiles); + assertThat(table.sortOrders().values()).containsExactly(table.sortOrder()); } @TestTemplate diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 4cfdd23bd39c..6cf7342a5fef 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -1960,4 +1960,22 @@ public void testMetadataWithRemoveSchemas() { assertThat(meta.changes()).anyMatch(u -> u instanceof MetadataUpdate.RemoveSchemas); } + + @Test + public void testMetadataWithRemoveSortOrders() { + TableMetadata base = + TableMetadata.newTableMetadata( + TestBase.SCHEMA, PartitionSpec.unpartitioned(), null, ImmutableMap.of()); + TableMetadata meta = + TableMetadata.buildFrom(base) + .addSortOrder(SortOrder.builderFor(base.schema()).asc("id").build()) + .addSortOrder(SortOrder.builderFor(base.schema()).asc("data").build()) + .removeSortOrders(Sets.newHashSet()) + .build(); + + assertThat(meta.changes()).noneMatch(u -> u instanceof MetadataUpdate.RemoveSortOrders); + + meta = TableMetadata.buildFrom(meta).removeSortOrders(Sets.newHashSet(1)).build(); + assertThat(meta.changes()).anyMatch(u -> u instanceof MetadataUpdate.RemoveSortOrders); + } } diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index c2fd24856fb2..485932bdd1df 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -1722,6 +1722,89 @@ public void testRemoveUnusedSchemas(boolean withBranch) { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRemoveUnusedSortOrders(boolean withBranch) { + String branch = "test"; + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = + catalog + .buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .withProperty(TableProperties.GC_ENABLED, "true") + .create(); + + table.replaceSortOrder().asc("id").commit(); + SortOrder idSortOrder = table.sortOrder(); + + DataFile initialFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withSortOrder(table.sortOrder()) + .withPartitionPath("id_bucket=0") + .withRecordCount(2) + .build(); + table.newFastAppend().appendFile(initialFile).commit(); + table.replaceSortOrder().asc(Expressions.bucket("id", 16)).commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + if (withBranch) { + table.manageSnapshots().createBranch(branch).commit(); + } + + table.replaceSortOrder().asc("data").commit(); + SortOrder dataSortOrder = table.sortOrder(); + + // including the initial default sort + assertThat(table.sortOrders().values()).as("Should have 4 total sort-orders").hasSize(4); + + table.expireSnapshots().cleanExpiredMetadata(true).commit(); + + Table loaded = catalog.loadTable(TABLE); + assertThat(loaded.snapshot(firstSnapshot.snapshotId())).isNotNull(); + assertThat(loaded.sortOrders()).as("Should have 2 total sort-order").hasSize(2); + assertThat(loaded.sortOrders().values()).containsExactlyInAnyOrder(idSortOrder, dataSortOrder); + + DataFile secondFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withSortOrder(table.sortOrder()) + .withPartitionPath("id_bucket=0") + .withRecordCount(2) + .build(); + table.newDelete().deleteFile(initialFile).commit(); + table.newFastAppend().appendFile(secondFile).commit(); + + table.replaceSortOrder().asc(Expressions.bucket("data", 8)).commit(); + table.replaceSortOrder().asc(Expressions.bucket("id", 8)).commit(); + SortOrder newSortOrder = table.sortOrder(); + + table + .expireSnapshots() + .expireOlderThan(table.currentSnapshot().timestampMillis()) + .cleanExpiredMetadata(true) + .commit(); + + loaded = catalog.loadTable(TABLE); + if (withBranch) { + assertThat(loaded.snapshots()) + .containsExactlyInAnyOrder(firstSnapshot, loaded.currentSnapshot()); + assertThat(loaded.sortOrders().values()) + .containsExactlyInAnyOrder(idSortOrder, newSortOrder, dataSortOrder); + } else { + assertThat(loaded.snapshot(firstSnapshot.snapshotId())).isNull(); + assertThat(loaded.sortOrders().values()) + .containsExactlyInAnyOrder(newSortOrder, dataSortOrder); + } + } + @Test public void testUpdateTableSortOrder() { C catalog = catalog();