diff --git a/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java b/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java new file mode 100644 index 000000000000..21ef010b8692 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; + +/** + * API for removing partition specs from the metadata which are not the default spec and no longer refer to any + * datafiles in the table. + *

+ * When committing, these changes will be applied to the latest table metadata. Commit conflicts + * will be resolved by recalculating which specs are no longer in use again in the latest metadata and + * retrying. + *

+ * {@link #apply()} returns the specs that will remain if committed on the current metadata + */ +public interface RemoveUnusedSpecs extends PendingUpdate> { + +} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 854d68b5887f..a79bb4190669 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -180,6 +180,13 @@ default String name() { */ AppendFiles newAppend(); + /** + * Remove any partition specs from the Metadata that are no longer used in any data files. Always preserves + * the current default spec even if it has not yet been used. + * @return a new {@link RemoveUnusedSpecs} + */ + RemoveUnusedSpecs removeUnusedSpecs(); + /** * Create a new {@link AppendFiles append API} to add files to this table and commit. *

diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 0a764f79fd76..26c6aded00a0 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -173,6 +173,11 @@ public UpdateProperties updateProperties() { throw new UnsupportedOperationException("Cannot update the properties of a metadata table"); } + @Override + public RemoveUnusedSpecs removeUnusedSpecs() { + throw new UnsupportedOperationException("Cannot remove partition specs of a metadata table"); + } + @Override public ReplaceSortOrder replaceSortOrder() { throw new UnsupportedOperationException("Cannot update the sort order of a metadata table"); diff --git a/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java b/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java new file mode 100644 index 000000000000..2603279db5b7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +public class BaseRemoveUnusedSpecs implements RemoveUnusedSpecs { + private final TableOperations ops; + private final Table table; + + public BaseRemoveUnusedSpecs(TableOperations ops, Table table) { + this.ops = ops; + this.table = table; + } + + @Override + public List apply() { + TableMetadata current = ops.refresh(); + TableMetadata newMetadata = removeUnusedSpecs(current); + return newMetadata.specs(); + } + + @Override + public void commit() { + TableMetadata base = ops.refresh(); + Tasks.foreach(ops) + .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run(taskOps -> { + TableMetadata current = ops.refresh(); + TableMetadata newMetadata = removeUnusedSpecs(current); + taskOps.commit(current, newMetadata); + }); + } + + private TableMetadata removeUnusedSpecs(TableMetadata current) { + List specs = current.specs(); + int currentSpecId = current.defaultSpecId(); + + // Read ManifestLists and get all specId's in use + Set specsInUse = + Sets.newHashSet( + CloseableIterable.transform( + MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.ALL_ENTRIES) + .newScan() + .planFiles(), + fileScanTask -> ((ManifestEntriesTable.ManifestReadTask) (fileScanTask)).partitionSpecId() + )); + + List remainingSpecs = specs.stream() + .filter(spec -> spec.specId() == currentSpecId || specsInUse.contains(spec.specId())) + .collect(Collectors.toList()); + + return current.withSpecs(remainingSpecs); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index b9886f0a1e68..553198902f2a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -144,6 +144,11 @@ public ReplaceSortOrder replaceSortOrder() { return new BaseReplaceSortOrder(ops); } + @Override + public RemoveUnusedSpecs removeUnusedSpecs() { + return new BaseRemoveUnusedSpecs(ops, this); + } + @Override public UpdateLocation updateLocation() { return new SetLocation(ops); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 3d0b31e49901..3b831aa7515c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -597,6 +597,11 @@ public UpdateProperties updateProperties() { return BaseTransaction.this.updateProperties(); } + @Override + public RemoveUnusedSpecs removeUnusedSpecs() { + throw new UnsupportedOperationException("Cannot remove unused partition specs as part of a transaction"); + } + @Override public ReplaceSortOrder replaceSortOrder() { return BaseTransaction.this.replaceSortOrder(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index a44fc6421428..3f29d043a960 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -163,5 +163,9 @@ public CloseableIterable rows() { public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split } + + int partitionSpecId() { + return manifest.partitionSpecId(); + } } } diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 492a9006b2cd..c4919962f83a 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -291,6 +291,11 @@ public AppendFiles newAppend() { throw new UnsupportedOperationException(errorMsg("newAppend")); } + @Override + public RemoveUnusedSpecs removeUnusedSpecs() { + throw new UnsupportedOperationException(errorMsg("removeUnusedSpecs")); + } + @Override public RewriteFiles newRewrite() { throw new UnsupportedOperationException(errorMsg("newRewrite")); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 2177207c07b8..3312b7c133e4 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -717,6 +717,17 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } + /** + * Only use when it has been externally validated that the new specs completely cover all currently referencable + * data files. + */ + TableMetadata withSpecs(List newSpecs) { + return new TableMetadata(null, formatVersion, uuid, location, + lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, newSpecs, + lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, + snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); + } + private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) { PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema()) .withSpecId(partitionSpec.specId()); diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java b/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java new file mode 100644 index 000000000000..918b33e5078e --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestRemoveUnusedSpecs extends TableTestBase { + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] { 1, 2 }; + } + + public TestRemoveUnusedSpecs(int formatVersion) { + super(formatVersion); + } + + @Test + public void testRemoveAllButCurrent() { + table.updateSchema() + .addColumn("ts", Types.TimestampType.withoutZone()) + .addColumn("category", Types.StringType.get()) + .commit(); + table.updateSpec().addField("id").commit(); + table.updateSpec().addField("ts").commit(); + table.updateSpec().addField("category").commit(); + table.updateSpec().addField("data").commit(); + Assert.assertEquals(5, table.specs().size()); + + PartitionSpec currentSpec = table.spec(); + table.removeUnusedSpecs().commit(); + + Assert.assertEquals(1, table.specs().size()); + Assert.assertEquals(currentSpec, table.spec()); + } + + @Test + public void testDontRemoveInUseSpecsV2() { + Assume.assumeTrue("V2", formatVersion == 2); + + table.updateSchema() + .addColumn("ts", Types.LongType.get()) + .addColumn("category", Types.StringType.get()) + .commit(); + + table.updateSpec().addField("id").commit(); // 1 + table.newAppend().appendFile(newDataFile("data_bucket=0/id=5")).commit(); + + table.updateSpec().addField("ts").commit(); // 2 + + table.updateSpec().addField("category").commit(); // 3 + table.newRowDelta() + .addDeletes(newDeleteFile(table.spec().specId(), "data_bucket=0/id=5/ts=100/category=fo")) + .commit(); + + table.updateSpec().addField("data").commit(); // 4 + Assert.assertEquals(5, table.specs().size()); + + PartitionSpec currentSpec = table.spec(); + table.removeUnusedSpecs().commit(); + + Assert.assertEquals("Missing required spec", ImmutableSet.of(1, 3, 4), table.specs().keySet()); + Assert.assertEquals(currentSpec, table.spec()); + } + + @Test + public void testDontRemoveInUseSpecs() { + Assume.assumeTrue("V2", formatVersion == 2); + + table.updateSchema() + .addColumn("ts", Types.LongType.get()) + .addColumn("category", Types.StringType.get()) + .commit(); + + table.updateSpec().addField("id").commit(); // 1 + table.newAppend().appendFile(newDataFile("data_bucket=0/id=5")).commit(); + + table.updateSpec().addField("ts").commit(); // 2 + + table.updateSpec().addField("category").commit(); // 3 + table.newAppend().appendFile(newDataFile("data_bucket=0/id=5/ts=100/category=fo")).commit(); + + table.updateSpec().addField("data").commit(); // 4 + Assert.assertEquals(5, table.specs().size()); + + PartitionSpec currentSpec = table.spec(); + table.removeUnusedSpecs().commit(); + + Assert.assertEquals("Missing required spec", ImmutableSet.of(1, 3, 4), table.specs().keySet()); + Assert.assertEquals(currentSpec, table.spec()); + } +}